diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 932945b..09584ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -46,6 +47,7 @@ *
  • Optional, application-specific binary service data.
  • *
  • Environment variables for the launched process.
  • *
  • Command to launch the container.
  • + *
  • Retry strategy when container fails to run.
  • * * * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) @@ -61,6 +63,17 @@ public static ContainerLaunchContext newInstance( Map environment, List commands, Map serviceData, ByteBuffer tokens, Map acls) { + return newInstance(localResources, environment, commands, serviceData, + tokens, acls, null); + } + + @Public + @Unstable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, ContainerRetry containerRetry) { ContainerLaunchContext container = Records.newRecord(ContainerLaunchContext.class); container.setLocalResources(localResources); @@ -69,6 +82,7 @@ public static ContainerLaunchContext newInstance( container.setServiceData(serviceData); container.setTokens(tokens); container.setApplicationACLs(acls); + container.setContainerRetry(containerRetry); return container; } @@ -195,4 +209,20 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get the ContainerRetry to relaunch container. + * @return ContainerRetry to relaunch container. + */ + @Public + @Unstable + public abstract ContainerRetry getContainerRetry(); + + /** + * Set the ContainerRetry to relaunch container. + * @param containerRetry ContainerRetry to relaunch container. + */ + @Public + @Unstable + public abstract void setContainerRetry(ContainerRetry containerRetry); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetry.java new file mode 100644 index 0000000..b3db774 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetry.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Set; + +/** + * {@code ContainerRetry} indicates how container retry after it fails to run. + *

    + * It provides details such as: + *

      + *
    • + * {@link ContainerRetryPolicy} : NEVER_RETRY(no matter what error code is + * when container fails to run, just do not retry), ALWAYS_RETRY(no matter + * what error code is, when container fails to run, just retry), + * RETRY_ON_SPECIFIC_ERROR_CODE(when container fails to run, do retry if the + * error code is one of errorCodes, otherwise do not retry. + * Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry + * because it is usually killed on purpose. + *
    • + *
    • + * retryTimes specifies how many times to retry if need to retry, + * if the value is -1, it means retry forever. + *
    • + *
    • retryInterval specifies delaying some time before relaunch + * container, the unit is seconds
    • + *
    + */ +@Public +@Unstable +public abstract class ContainerRetry { + public static final int RETRY_FOREVER = -1; + public static final int RETRY_INVALID = -1000; + + @Private + @Unstable + public static ContainerRetry newInstance(ContainerRetryPolicy retryPolicy, + Set errorCodes, int retryTimes, int retryInterval) { + ContainerRetry containerRetry = Records.newRecord(ContainerRetry.class); + containerRetry.setRetryPolicy(retryPolicy); + containerRetry.setErrorCodes(errorCodes); + containerRetry.setRetryTimes(retryTimes); + containerRetry.setRetryInterval(retryInterval); + return containerRetry; + } + + public abstract ContainerRetryPolicy getRetryPolicy(); + public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); + public abstract Set getErrorCodes(); + public abstract void setErrorCodes(Set errorCodes); + public abstract int getRetryTimes(); + public abstract void setRetryTimes(int retryTimes); + public abstract int getRetryInterval(); + public abstract void setRetryInterval(int retryInterval); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java new file mode 100644 index 0000000..a3816c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

    Retry policy for relaunching a Container.

    + */ +@Public +@Unstable +public enum ContainerRetryPolicy { + /** Never retry. */ + NEVER_RETRY, + /** Always retry. */ + ALWAYS_RETRY, + /** Retry for specific error codes. */ + RETRY_ON_SPECIFIC_ERROR_CODE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3c208e2..eb7f5fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -482,6 +482,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional ContainerRetryProto container_retry = 7; } message ContainerStatusProto { @@ -504,6 +505,19 @@ message ContainerResourceChangeRequestProto { optional ResourceProto capability = 2; } +message ContainerRetryProto { + optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; + repeated int32 error_codes = 2; + optional int32 retry_times = 3 [default = 0]; + optional int32 retry_interval = 4 [default = 0]; +} + +enum ContainerRetryPolicyProto { + NEVER_RETRY = 0; + ALWAYS_RETRY = 1; + RETRY_ON_SPECIFIC_ERROR_CODE = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f410c43..76f1037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -76,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetry; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -253,6 +256,13 @@ // File length needed for local resource private long shellScriptPathLen = 0; + // Container retry options + private ContainerRetryPolicy containerRetryPolicy = + ContainerRetryPolicy.NEVER_RETRY; + private Set errorCodes = null; + private int retryTimes = 0; + private int retryInterval = 0; + // Timeline domain ID private String domainId = null; @@ -371,6 +381,17 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("retry_times", true, "If container could retry, it specifies" + + " retry times"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is seconds"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -508,6 +529,19 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + + containerRetryPolicy = ContainerRetryPolicy.values()[ + Integer.parseInt(cliParser.getOptionValue("retry_policy", "0"))]; + if (cliParser.hasOption("error_codes")) { + errorCodes = new HashSet<>(); + for (String errorCode : + cliParser.getOptionValue("error_codes").split(",")) { + errorCodes.add(Integer.parseInt(errorCode)); + } + } + retryTimes = Integer.parseInt(cliParser.getOptionValue("retry_times", "0")); + retryInterval = Integer.parseInt(cliParser.getOptionValue( + "retry_interval", "0")); return true; } @@ -1062,9 +1096,11 @@ public void run() { // "hadoop dfs" command inside the distributed shell. Map myShellEnv = new HashMap(shellEnv); myShellEnv.put(YARN_SHELL_ID, shellId); + ContainerRetry containerRetry = ContainerRetry.newInstance( + containerRetryPolicy, errorCodes, retryTimes, retryInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), - null); + null, containerRetry); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 68d2bde..bc89cce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -168,6 +168,8 @@ private long attemptFailuresValidityInterval = -1; + private Vector containerRetryOptions = new Vector<>(5); + // Debug flag boolean debugFlag = false; @@ -287,6 +289,17 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("retry_times", true, "If container could retry, it specifies" + + "retry times"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is seconds"); } /** @@ -429,6 +442,24 @@ public boolean init(String[] args) throws ParseException { } } + // Get container retry options + if (cliParser.hasOption("retry_policy")) { + containerRetryOptions.add("--retry_policy " + + cliParser.getOptionValue("retry_policy")); + } + if (cliParser.hasOption("error_codes")) { + containerRetryOptions.add("--error_codes " + + cliParser.getOptionValue("error_codes")); + } + if (cliParser.hasOption("retry_times")) { + containerRetryOptions.add("--retry_times " + + cliParser.getOptionValue("retry_times")); + } + if (cliParser.hasOption("retry_interval")) { + containerRetryOptions.add("--retry_interval " + + cliParser.getOptionValue("retry_interval")); + } + return true; } @@ -638,6 +669,8 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } + vargs.addAll(containerRetryOptions); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..1bd815c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetry; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; @@ -56,6 +58,7 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; + private ContainerRetry containerRetry = null; public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); @@ -120,6 +123,9 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.containerRetry != null) { + builder.setContainerRetry(convertToProtoFormat(this.containerRetry)); + } } private void mergeLocalToProto() { @@ -456,6 +462,28 @@ public void setApplicationACLs( this.applicationACLS.putAll(appACLs); } + @Override + public ContainerRetry getContainerRetry() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerRetry != null) { + return this.containerRetry; + } + if (!p.hasContainerRetry()) { + return null; + } + this.containerRetry = convertFromProtoFormat(p.getContainerRetry()); + return this.containerRetry; + } + + @Override + public void setContainerRetry(ContainerRetry retry) { + maybeInitBuilder(); + if (retry == null) { + builder.clearContainerRetry(); + } + this.containerRetry = retry; + } + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); } @@ -463,4 +491,12 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } -} + + private ContainerRetryPBImpl convertFromProtoFormat(ContainerRetryProto p) { + return new ContainerRetryPBImpl(p); + } + + private ContainerRetryProto convertToProtoFormat(ContainerRetry t) { + return ((ContainerRetryPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryPBImpl.java new file mode 100644 index 0000000..912ad62 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryPBImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ContainerRetry; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryProtoOrBuilder; + +import java.util.HashSet; +import java.util.Set; + +/** + * Implementation of ContainerRetry. + */ +public class ContainerRetryPBImpl extends ContainerRetry{ + private ContainerRetryProto proto = ContainerRetryProto.getDefaultInstance(); + private ContainerRetryProto.Builder builder = null; + private boolean viaProto = false; + + private Set errorCodes = null; + + public ContainerRetryPBImpl() { + builder = ContainerRetryProto.newBuilder(); + } + + public ContainerRetryPBImpl(ContainerRetryProto proto) { + this.proto = proto; + viaProto = true; + } + + public ContainerRetryProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.errorCodes != null) { + builder.clearErrorCodes(); + builder.addAllErrorCodes(this.errorCodes); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerRetryProto.newBuilder(proto); + } + viaProto = false; + } + + public ContainerRetryPolicy getRetryPolicy() { + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryPolicy()) { + return ContainerRetryPolicy.NEVER_RETRY; + } + return convertFromProtoFormat(p.getRetryPolicy()); + } + + public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) { + maybeInitBuilder(); + if (containerRetryPolicy == null) { + builder.clearRetryPolicy(); + return; + } + builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy)); + } + + private void initErrorCodes() { + if (this.errorCodes != null) { + return; + } + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + this.errorCodes = new HashSet<>(); + this.errorCodes.addAll(p.getErrorCodesList()); + } + + public Set getErrorCodes() { + initErrorCodes(); + return this.errorCodes; + } + + public void setErrorCodes(Set errCodes) { + maybeInitBuilder(); + if (errCodes == null || errCodes.isEmpty()) { + builder.clearErrorCodes(); + } + this.errorCodes = errCodes; + } + + public int getRetryTimes() { + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryTimes()) { + return 0; + } + return p.getRetryTimes(); + } + + public void setRetryTimes(int retryTimes) { + maybeInitBuilder(); + builder.setRetryTimes(retryTimes); + } + + public int getRetryInterval() { + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryInterval()) { + return 0; + } + return p.getRetryInterval(); + } + + public void setRetryInterval(int retryInterval) { + maybeInitBuilder(); + builder.setRetryInterval(retryInterval); + } + + private ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy containerRetryPolicy) { + return ProtoUtils.convertToProtoFormat(containerRetryPolicy); + } + + private ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto containerRetryPolicyProto) { + return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index e742f4c..7abc038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -282,4 +284,17 @@ public static ContainerTypeProto convertToProtoFormat(ContainerType e) { public static ContainerType convertFromProtoFormat(ContainerTypeProto e) { return ContainerType.valueOf(e.name()); } + + /* + * ContainerRetryPolicy + */ + public static ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy e) { + return ContainerRetryPolicyProto.valueOf(e.name()); + } + + public static ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto e) { + return ContainerRetryPolicy.valueOf(e.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index b7f5ff7..fefdc12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerRetry; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -159,6 +160,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -192,6 +194,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -345,7 +348,7 @@ private static Object genTypeValue(Type type) { return rand.nextBoolean(); } else if (type.equals(byte.class)) { return bytes[rand.nextInt(4)]; - } else if (type.equals(int.class)) { + } else if (type.equals(int.class) || type.equals(Integer.class)) { return rand.nextInt(1000000); } else if (type.equals(long.class)) { return Long.valueOf(rand.nextInt(1000000)); @@ -469,6 +472,7 @@ public static void setup() throws Exception { generateByNewInstance(ApplicationResourceUsageReport.class); generateByNewInstance(ApplicationReport.class); generateByNewInstance(Container.class); + generateByNewInstance(ContainerRetry.class); generateByNewInstance(ContainerLaunchContext.class); generateByNewInstance(ApplicationSubmissionContext.class); generateByNewInstance(ContainerReport.class); @@ -957,6 +961,11 @@ public void testContainerIdPBImpl() throws Exception { } @Test + public void testContainerRetryPBImpl() throws Exception { + validatePBImplRecord(ContainerRetryPBImpl.class, ContainerRetryProto.class); + } + + @Test public void testContainerLaunchContextPBImpl() throws Exception { validatePBImplRecord(ContainerLaunchContextPBImpl.class, ContainerLaunchContextProto.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 8c74bf5..d08ee67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -308,6 +308,7 @@ public void writeLaunchEnv(OutputStream out, } public enum ExitCode { + SUCCESS(0), FORCE_KILLED(137), TERMINATED(143), LOST(154); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9ccde5f..6437fc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -89,6 +89,11 @@ public DefaultContainerExecutor() { protected void copyFile(Path src, Path dst, String owner) throws IOException { lfs.util().copy(src, dst); } + + protected void copyFile(Path src, Path dst, String owner, boolean overwrite) + throws IOException { + lfs.util().copy(src, dst, false, overwrite); + } protected void setScriptExecutable(Path script, String owner) throws IOException { lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); @@ -178,12 +183,12 @@ public int launchContainer(ContainerStartContext ctx) throws IOException { // copy container tokens to work dir Path tokenDst = new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); - copyFile(nmPrivateTokensPath, tokenDst, user); + copyFile(nmPrivateTokensPath, tokenDst, user, true); // copy launch script to work dir Path launchDst = new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); - copyFile(nmPrivateContainerScriptPath, launchDst, user); + copyFile(nmPrivateContainerScriptPath, launchDst, user, true); // Create new local launch wrapper script LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index f44de59..db76e7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -349,7 +349,8 @@ private void recoverContainer(RecoveredContainerState rcs) Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability()); + rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), + rcs.getRemainRetries()); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 1d2ec56..64b6a65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -55,6 +55,8 @@ NMContainerStatus getNMContainerStatus(); + boolean shouldRetry(int errorCode); + String toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index e16ea93..fc212af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetry; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -100,6 +103,9 @@ private long containerLocalizationStartTime; private long containerLaunchStartTime; private static Clock clock = new SystemClock(); + private final ContainerRetry containerRetry; + // remain retries to relaunch container if needed + private int remainRetries; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -134,6 +140,14 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.dispatcher = dispatcher; this.stateStore = stateStore; this.launchContext = launchContext; + if (launchContext != null) { + this.containerRetry = launchContext.getContainerRetry(); + if (this.containerRetry != null) { + remainRetries = containerRetry.getRetryTimes(); + } + } else { + this.containerRetry = null; + } this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); this.resource = containerTokenIdentifier.getResource(); @@ -154,7 +168,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled, Resource recoveredCapability) { + String diagnostics, boolean wasKilled, Resource recoveredCapability, + int remainRetries) { this(conf, dispatcher, stateStore, launchContext, creds, metrics, containerTokenIdentifier); this.recoveredStatus = recoveredStatus; @@ -167,6 +182,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.resource = Resource.newInstance(recoveredCapability.getMemory(), recoveredCapability.getVirtualCores()); } + if (remainRetries != ContainerRetry.RETRY_INVALID) { + this.remainRetries = remainRetries; + } } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -246,9 +264,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, - ContainerState.EXITED_WITH_FAILURE, + EnumSet.of(ContainerState.EXITED_WITH_FAILURE, + ContainerState.LOCALIZED), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition(true)) + new RetryWithFailureTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) @@ -854,6 +873,82 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** + * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * CONTAINER_EXITED_WITH_FAILURE state. + **/ + @SuppressWarnings("unchecked") // dispatcher not typed + static class RetryWithFailureTransition implements + MultipleArcTransition { + + @Override + public ContainerState transition(final ContainerImpl container, + ContainerEvent event) { + ContainerExitEvent exitEvent = (ContainerExitEvent) event; + container.exitCode = exitEvent.getExitCode(); + if (exitEvent.getDiagnosticInfo() != null) { + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); + } + + // Should retry. If maxRetries is negative, retry always. + if (container.shouldRetry(container.exitCode)) { + if (container.remainRetries > 0) { + container.remainRetries -= 1; + try { + container.stateStore.storeContainerRemainRetries( + container.getContainerId(), container.remainRetries); + } catch (IOException e) { + LOG.warn("Unable to update remainRetries in state store for " + + container.getContainerId(), e); + } + } + LOG.info("Relaunch Container " + container.getContainerId() + + ". Remain retry times : " + container.remainRetries + + ". Retry interval is " + + container.containerRetry.getRetryInterval() + "s"); + container.wasLaunched = false; + container.metrics.endRunningContainer(); + // wait for some time, then relaunch + new Thread() { + @Override + public void run() { + try { + Thread.sleep(container.containerRetry.getRetryInterval() * 1000); + container.sendLaunchEvent(); + } catch (InterruptedException e) { + return; + } + } + }.start(); + return ContainerState.LOCALIZED; + } else { + new ExitedWithFailureTransition(true).transition(container, event); + return ContainerState.EXITED_WITH_FAILURE; + } + } + } + + @Override + public boolean shouldRetry(int errorCode) { + if (containerRetry == null + || errorCode == ExitCode.SUCCESS.getExitCode() + || errorCode == ExitCode.FORCE_KILLED.getExitCode() + || errorCode == ExitCode.TERMINATED.getExitCode()) { + return false; + } + + ContainerRetryPolicy retryPolicy = containerRetry.getRetryPolicy(); + if (retryPolicy == ContainerRetryPolicy.ALWAYS_RETRY + || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE + && containerRetry.getErrorCodes() != null + && containerRetry.getErrorCodes().contains(errorCode))) { + return remainRetries > 0 + || remainRetries == ContainerRetry.RETRY_FOREVER; + } + + return false; + } + + /** * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST */ static class KilledExternallyTransition extends ExitedWithFailureTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 6371b21..30181a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -329,7 +329,9 @@ public Integer call() { completed.set(true); exec.deactivateContainer(containerID); try { - context.getNMStateStore().storeContainerCompleted(containerID, ret); + if (!container.shouldRetry(ret)) { + context.getNMStateStore().storeContainerCompleted(containerID, ret); + } } catch (IOException e) { LOG.error("Unable to set exit code for container " + containerID); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 89c71bb..802e8f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -106,6 +106,8 @@ "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = + "/remainRetries"; private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; @@ -238,6 +240,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { rcs.capability = new ResourcePBImpl( ResourceProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { + rcs.setRemainRetries(Integer.parseInt(asString(entry.getValue()))); } else { throw new IOException("Unexpected container state key: " + key); } @@ -321,6 +325,18 @@ public void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(remainRetries))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index d5dce9b..688517f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -104,6 +104,11 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode) } @Override + public void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException { + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index e8ccf54..47c7112 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerRetry; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -76,6 +77,7 @@ public NMStateStoreService(String name) { String diagnostics = ""; StartContainerRequest startRequest; Resource capability; + private int remainRetries = ContainerRetry.RETRY_INVALID; public RecoveredContainerStatus getStatus() { return status; @@ -100,6 +102,14 @@ public StartContainerRequest getStartRequest() { public Resource getCapability() { return capability; } + + public int getRemainRetries() { + return remainRetries; + } + + public void setRemainRetries(int retries) { + this.remainRetries = retries; + } } public static class LocalResourceTrackerState { @@ -325,6 +335,15 @@ public abstract void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException; /** + * Record remain retry times for a container. + * @param containerId the container ID + * @param remainRetries the remain retry times when container fails to run + * @throws IOException + */ + public abstract void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException; + + /** * Remove records corresponding to a container * @param containerId the container ID * @throws IOException diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 2ab9842..4f02fcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetry; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -645,6 +647,66 @@ public void testLaunchAfterKillRequest() throws Exception { } } + @Test + public void testContainerRetry() throws Exception{ + ContainerRetry containerRetry1 = ContainerRetry.newInstance( + ContainerRetryPolicy.NEVER_RETRY, null, 3, 0); + testContainerRetry(containerRetry1, 2, 0); + + ContainerRetry containerRetry2 = ContainerRetry.newInstance( + ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); + testContainerRetry(containerRetry2, 2, 3); + + ContainerRetry containerRetry3 = ContainerRetry.newInstance( + ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); + // If exit code is 0, it will not retry + testContainerRetry(containerRetry3, 0, 0); + + ContainerRetry containerRetry4 = ContainerRetry.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, null, 3, 0); + testContainerRetry(containerRetry4, 2, 0); + + HashSet errorCodes = new HashSet<>(); + errorCodes.add(2); + errorCodes.add(6); + ContainerRetry containerRetry5 = ContainerRetry.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes, 3, 0); + testContainerRetry(containerRetry5, 2, 3); + + HashSet errorCodes2 = new HashSet<>(); + errorCodes.add(143); + ContainerRetry containerRetry6 = ContainerRetry.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes2, 3, 0); + // If exit code is 143(SIGTERM), it will not retry even it is in errorCodes. + testContainerRetry(containerRetry5, 143, 0); + } + + private void testContainerRetry(ContainerRetry containerRetry, int exitCode, + int expectedRetries) throws Exception{ + WrappedContainer wc = null; + try { + int retryTimes = 0; + wc = new WrappedContainer(24, 314159265358979L, 4344, "yak", + containerRetry); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + while (true) { + wc.containerFailed(exitCode); + if (wc.c.getContainerState() == ContainerState.RUNNING) { + retryTimes ++; + } else { + break; + } + } + Assert.assertEquals(expectedRetries, retryTimes); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -782,12 +844,23 @@ public boolean matches(Object o) { WrappedContainer(int appId, long timestamp, int id, String user) throws IOException { - this(appId, timestamp, id, user, true, false); + this(appId, timestamp, id, user, null); + } + + WrappedContainer(int appId, long timestamp, int id, String user, + ContainerRetry containerRetry) throws IOException { + this(appId, timestamp, id, user, true, false, containerRetry); } - @SuppressWarnings("rawtypes") WrappedContainer(int appId, long timestamp, int id, String user, boolean withLocalRes, boolean withServiceData) throws IOException { + this(appId, timestamp, id, user, withLocalRes, withServiceData, null); + } + + @SuppressWarnings("rawtypes") + WrappedContainer(int appId, long timestamp, int id, String user, + boolean withLocalRes, boolean withServiceData, + ContainerRetry containerRetry) throws IOException { dispatcher = new DrainDispatcher(); dispatcher.init(new Configuration()); @@ -864,6 +937,8 @@ public boolean matches(Object o) { } when(ctxt.getServiceData()).thenReturn(serviceData); + when(ctxt.getContainerRetry()).thenReturn(containerRetry); + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), ctxt, null, metrics, identifier); dispatcher.register(ContainerEventType.class, @@ -984,6 +1059,10 @@ public void containerFailed(int exitCode) { assert containerStatus.getDiagnostics().contains(diagnosticMsg); assert containerStatus.getExitStatus() == exitCode; drainDispatcherEvents(); + // If container needs retry, relaunch it + if (c.getContainerState() == ContainerState.LOCALIZED) { + launchContainer(); + } } public void killContainer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index a1c95ab..b83f5c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -124,6 +124,7 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; + rcsCopy.setRemainRetries(rcs.getRemainRetries()); result.add(rcsCopy); } return result; @@ -177,6 +178,13 @@ public synchronized void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setRemainRetries(remainRetries); + } + + @Override public synchronized void removeContainer(ContainerId containerId) throws IOException { containerStates.remove(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 08b49e7..b11aa6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -337,6 +337,13 @@ public void testContainerStorage() throws IOException { assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + // store remainRetries + stateStore.storeContainerRemainRetries(containerId, 6); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + assertEquals(6, recoveredContainers.get(0).getRemainRetries()); + // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 394a92c..bfe407a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -144,4 +144,9 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() { public NMContainerStatus getNMContainerStatus() { return null; } + + @Override + public boolean shouldRetry(int errorCode) { + return false; + } }