diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 932945b..09584ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -46,6 +47,7 @@ *
  • Optional, application-specific binary service data.
  • *
  • Environment variables for the launched process.
  • *
  • Command to launch the container.
  • + *
  • Retry strategy when container fails to run.
  • * * * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) @@ -61,6 +63,17 @@ public static ContainerLaunchContext newInstance( Map environment, List commands, Map serviceData, ByteBuffer tokens, Map acls) { + return newInstance(localResources, environment, commands, serviceData, + tokens, acls, null); + } + + @Public + @Unstable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, ContainerRetry containerRetry) { ContainerLaunchContext container = Records.newRecord(ContainerLaunchContext.class); container.setLocalResources(localResources); @@ -69,6 +82,7 @@ public static ContainerLaunchContext newInstance( container.setServiceData(serviceData); container.setTokens(tokens); container.setApplicationACLs(acls); + container.setContainerRetry(containerRetry); return container; } @@ -195,4 +209,20 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get the ContainerRetry to relaunch container. + * @return ContainerRetry to relaunch container. + */ + @Public + @Unstable + public abstract ContainerRetry getContainerRetry(); + + /** + * Set the ContainerRetry to relaunch container. + * @param containerRetry ContainerRetry to relaunch container. + */ + @Public + @Unstable + public abstract void setContainerRetry(ContainerRetry containerRetry); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetry.java new file mode 100644 index 0000000..dbb9414 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetry.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Set; + +/** + * {@code ContainerRetry} indicates how container retry after it fails to run. + *

    + * It provides details such as: + *

      + *
    • + * {@link ContainerRetryPolicy} : NEVER_RETRY(no matter what error code is + * when container fails to run, just do not retry), ALWAYS_RETRY(no matter + * what error code is, when container fails to run, just retry), + * RETRY_ON_SPECIFIC_ERROR_CODE(when container fails to run, do retry if the + * error code is one of errorCodes, otherwise do not retry. + *
    • + *
    • + * retryTimes specifies how many times to retry if need to retry, + * if the value is -1, it means retry forever. + *
    • + *
    • retryInterval specifies delaying some time before relaunch + * container, the unit is seconds
    • + *
    + */ +@Public +@Unstable +public abstract class ContainerRetry { + public static final int RETRY_FOREVER = -1; + public static final int RETRY_INVALID = -1000; + + @Private + @Unstable + public static ContainerRetry newInstance(ContainerRetryPolicy retryPolicy, + Set errorCodes, int retryTimes, int retryInterval) { + ContainerRetry containerRetry = Records.newRecord(ContainerRetry.class); + containerRetry.setRetryPolicy(retryPolicy); + containerRetry.setErrorCodes(errorCodes); + containerRetry.setRetryTimes(retryTimes); + containerRetry.setRetryInterval(retryInterval); + return containerRetry; + } + + public abstract ContainerRetryPolicy getRetryPolicy(); + public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); + public abstract Set getErrorCodes(); + public abstract void setErrorCodes(Set errorCodes); + public abstract int getRetryTimes(); + public abstract void setRetryTimes(int retryTimes); + public abstract int getRetryInterval(); + public abstract void setRetryInterval(int retryInterval); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java new file mode 100644 index 0000000..a3816c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

    Retry policy for relaunching a Container.

    + */ +@Public +@Unstable +public enum ContainerRetryPolicy { + /** Never retry. */ + NEVER_RETRY, + /** Always retry. */ + ALWAYS_RETRY, + /** Retry for specific error codes. */ + RETRY_ON_SPECIFIC_ERROR_CODE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3c208e2..eb7f5fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -482,6 +482,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional ContainerRetryProto container_retry = 7; } message ContainerStatusProto { @@ -504,6 +505,19 @@ message ContainerResourceChangeRequestProto { optional ResourceProto capability = 2; } +message ContainerRetryProto { + optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; + repeated int32 error_codes = 2; + optional int32 retry_times = 3 [default = 0]; + optional int32 retry_interval = 4 [default = 0]; +} + +enum ContainerRetryPolicyProto { + NEVER_RETRY = 0; + ALWAYS_RETRY = 1; + RETRY_ON_SPECIFIC_ERROR_CODE = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f410c43..76f1037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -76,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetry; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -253,6 +256,13 @@ // File length needed for local resource private long shellScriptPathLen = 0; + // Container retry options + private ContainerRetryPolicy containerRetryPolicy = + ContainerRetryPolicy.NEVER_RETRY; + private Set errorCodes = null; + private int retryTimes = 0; + private int retryInterval = 0; + // Timeline domain ID private String domainId = null; @@ -371,6 +381,17 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("retry_times", true, "If container could retry, it specifies" + + " retry times"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is seconds"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -508,6 +529,19 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + + containerRetryPolicy = ContainerRetryPolicy.values()[ + Integer.parseInt(cliParser.getOptionValue("retry_policy", "0"))]; + if (cliParser.hasOption("error_codes")) { + errorCodes = new HashSet<>(); + for (String errorCode : + cliParser.getOptionValue("error_codes").split(",")) { + errorCodes.add(Integer.parseInt(errorCode)); + } + } + retryTimes = Integer.parseInt(cliParser.getOptionValue("retry_times", "0")); + retryInterval = Integer.parseInt(cliParser.getOptionValue( + "retry_interval", "0")); return true; } @@ -1062,9 +1096,11 @@ public void run() { // "hadoop dfs" command inside the distributed shell. Map myShellEnv = new HashMap(shellEnv); myShellEnv.put(YARN_SHELL_ID, shellId); + ContainerRetry containerRetry = ContainerRetry.newInstance( + containerRetryPolicy, errorCodes, retryTimes, retryInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), - null); + null, containerRetry); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 68d2bde..bc89cce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -168,6 +168,8 @@ private long attemptFailuresValidityInterval = -1; + private Vector containerRetryOptions = new Vector<>(5); + // Debug flag boolean debugFlag = false; @@ -287,6 +289,17 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("retry_times", true, "If container could retry, it specifies" + + "retry times"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is seconds"); } /** @@ -429,6 +442,24 @@ public boolean init(String[] args) throws ParseException { } } + // Get container retry options + if (cliParser.hasOption("retry_policy")) { + containerRetryOptions.add("--retry_policy " + + cliParser.getOptionValue("retry_policy")); + } + if (cliParser.hasOption("error_codes")) { + containerRetryOptions.add("--error_codes " + + cliParser.getOptionValue("error_codes")); + } + if (cliParser.hasOption("retry_times")) { + containerRetryOptions.add("--retry_times " + + cliParser.getOptionValue("retry_times")); + } + if (cliParser.hasOption("retry_interval")) { + containerRetryOptions.add("--retry_interval " + + cliParser.getOptionValue("retry_interval")); + } + return true; } @@ -638,6 +669,8 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } + vargs.addAll(containerRetryOptions); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..1bd815c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetry; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; @@ -56,6 +58,7 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; + private ContainerRetry containerRetry = null; public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); @@ -120,6 +123,9 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.containerRetry != null) { + builder.setContainerRetry(convertToProtoFormat(this.containerRetry)); + } } private void mergeLocalToProto() { @@ -456,6 +462,28 @@ public void setApplicationACLs( this.applicationACLS.putAll(appACLs); } + @Override + public ContainerRetry getContainerRetry() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerRetry != null) { + return this.containerRetry; + } + if (!p.hasContainerRetry()) { + return null; + } + this.containerRetry = convertFromProtoFormat(p.getContainerRetry()); + return this.containerRetry; + } + + @Override + public void setContainerRetry(ContainerRetry retry) { + maybeInitBuilder(); + if (retry == null) { + builder.clearContainerRetry(); + } + this.containerRetry = retry; + } + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); } @@ -463,4 +491,12 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } -} + + private ContainerRetryPBImpl convertFromProtoFormat(ContainerRetryProto p) { + return new ContainerRetryPBImpl(p); + } + + private ContainerRetryProto convertToProtoFormat(ContainerRetry t) { + return ((ContainerRetryPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryPBImpl.java new file mode 100644 index 0000000..1765aef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryPBImpl.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ContainerRetry; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryProtoOrBuilder; + +import java.util.HashSet; +import java.util.Set; + +/** + * Implementation of ContainerRetry. + */ +public class ContainerRetryPBImpl extends ContainerRetry{ + private ContainerRetryProto proto = ContainerRetryProto.getDefaultInstance(); + private ContainerRetryProto.Builder builder = null; + private boolean viaProto = false; + + private Set errorCodes = null; + + public ContainerRetryPBImpl() { + builder = ContainerRetryProto.newBuilder(); + } + + public ContainerRetryPBImpl(ContainerRetryProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized ContainerRetryProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.errorCodes != null) { + builder.clearErrorCodes(); + builder.addAllErrorCodes(this.errorCodes); + } + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerRetryProto.newBuilder(proto); + } + viaProto = false; + } + + public ContainerRetryPolicy getRetryPolicy() { + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryPolicy()) { + return ContainerRetryPolicy.NEVER_RETRY; + } + return convertFromProtoFormat(p.getRetryPolicy()); + } + + public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) { + maybeInitBuilder(); + if (containerRetryPolicy == null) { + builder.clearRetryPolicy(); + } + builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy)); + } + + private void initErrorCodes() { + if (this.errorCodes != null) { + return; + } + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + this.errorCodes = new HashSet<>(); + this.errorCodes.addAll(p.getErrorCodesList()); + } + + public Set getErrorCodes() { + initErrorCodes(); + return this.errorCodes; + } + + public void setErrorCodes(Set errCodes) { + maybeInitBuilder(); + if (errCodes == null || errCodes.isEmpty()) { + builder.clearErrorCodes(); + } + this.errorCodes = errCodes; + } + + public int getRetryTimes() { + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryTimes()) { + return 0; + } + return p.getRetryTimes(); + } + + public void setRetryTimes(int retryTimes) { + maybeInitBuilder(); + builder.setRetryTimes(retryTimes); + } + + public int getRetryInterval() { + ContainerRetryProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryInterval()) { + return 0; + } + return p.getRetryInterval(); + } + + public void setRetryInterval(int retryInterval) { + maybeInitBuilder(); + builder.setRetryInterval(retryInterval); + } + + private ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy containerRetryPolicy) { + return ProtoUtils.convertToProtoFormat(containerRetryPolicy); + } + + private ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto containerRetryPolicyProto) { + return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index e742f4c..7abc038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -282,4 +284,17 @@ public static ContainerTypeProto convertToProtoFormat(ContainerType e) { public static ContainerType convertFromProtoFormat(ContainerTypeProto e) { return ContainerType.valueOf(e.name()); } + + /* + * ContainerRetryPolicy + */ + public static ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy e) { + return ContainerRetryPolicyProto.valueOf(e.name()); + } + + public static ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto e) { + return ContainerRetryPolicy.valueOf(e.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9ccde5f..6437fc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -89,6 +89,11 @@ public DefaultContainerExecutor() { protected void copyFile(Path src, Path dst, String owner) throws IOException { lfs.util().copy(src, dst); } + + protected void copyFile(Path src, Path dst, String owner, boolean overwrite) + throws IOException { + lfs.util().copy(src, dst, false, overwrite); + } protected void setScriptExecutable(Path script, String owner) throws IOException { lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); @@ -178,12 +183,12 @@ public int launchContainer(ContainerStartContext ctx) throws IOException { // copy container tokens to work dir Path tokenDst = new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); - copyFile(nmPrivateTokensPath, tokenDst, user); + copyFile(nmPrivateTokensPath, tokenDst, user, true); // copy launch script to work dir Path launchDst = new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); - copyFile(nmPrivateContainerScriptPath, launchDst, user); + copyFile(nmPrivateContainerScriptPath, launchDst, user, true); // Create new local launch wrapper script LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index f44de59..db76e7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -349,7 +349,8 @@ private void recoverContainer(RecoveredContainerState rcs) Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability()); + rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), + rcs.getRemainRetries()); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index e16ea93..c0dcbc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetry; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -100,6 +102,9 @@ private long containerLocalizationStartTime; private long containerLaunchStartTime; private static Clock clock = new SystemClock(); + private final ContainerRetry containerRetry; + // remain retries to relaunch container if needed + private int remainRetries; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -134,6 +139,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.dispatcher = dispatcher; this.stateStore = stateStore; this.launchContext = launchContext; + this.containerRetry = launchContext.getContainerRetry(); + if (this.containerRetry != null) { + remainRetries = containerRetry.getRetryTimes(); + } this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); this.resource = containerTokenIdentifier.getResource(); @@ -154,7 +163,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled, Resource recoveredCapability) { + String diagnostics, boolean wasKilled, Resource recoveredCapability, + int remainRetries) { this(conf, dispatcher, stateStore, launchContext, creds, metrics, containerTokenIdentifier); this.recoveredStatus = recoveredStatus; @@ -167,6 +177,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.resource = Resource.newInstance(recoveredCapability.getMemory(), recoveredCapability.getVirtualCores()); } + if (remainRetries != ContainerRetry.RETRY_INVALID) { + this.remainRetries = remainRetries; + } } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -246,9 +259,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, - ContainerState.EXITED_WITH_FAILURE, + EnumSet.of(ContainerState.EXITED_WITH_FAILURE, + ContainerState.LOCALIZED), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition(true)) + new RetryWithFailureTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) @@ -854,6 +868,78 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** + * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * CONTAINER_EXITED_WITH_FAILURE state. + **/ + @SuppressWarnings("unchecked") // dispatcher not typed + static class RetryWithFailureTransition implements + MultipleArcTransition { + + @Override + public ContainerState transition(final ContainerImpl container, + ContainerEvent event) { + ContainerExitEvent exitEvent = (ContainerExitEvent) event; + container.exitCode = exitEvent.getExitCode(); + if (exitEvent.getDiagnosticInfo() != null) { + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); + } + + // Should retry. If maxRetries is negative, retry always. + if (container.shouldRetry(container.exitCode)) { + if (container.remainRetries > 0) { + container.remainRetries -= 1; + try { + container.stateStore.storeContainerRemainRetries( + container.getContainerId(), container.remainRetries); + } catch (IOException e) { + LOG.warn("Unable to update remainRetries in state store for " + + container.getContainerId(), e); + } + } + LOG.info("Relaunch Container " + container.getContainerId() + + ". Remain retry times : " + container.remainRetries + + ". Retry interval is " + + container.containerRetry.getRetryInterval() + "s"); + container.wasLaunched = false; + container.metrics.endRunningContainer(); + // wait for some time, then relaunch + new Thread() { + @Override + public void run() { + try { + Thread.sleep(container.containerRetry.getRetryInterval() * 1000); + container.sendLaunchEvent(); + } catch (InterruptedException e) { + return; + } + } + }.start(); + return ContainerState.LOCALIZED; + } else { + new ExitedWithFailureTransition(true).transition(container, event); + return ContainerState.EXITED_WITH_FAILURE; + } + } + } + + public boolean shouldRetry(int errorCode) { + if (containerRetry == null) { + return false; + } + + ContainerRetryPolicy retryPolicy = containerRetry.getRetryPolicy(); + if (retryPolicy == ContainerRetryPolicy.ALWAYS_RETRY + || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE + && containerRetry.getErrorCodes() != null + && containerRetry.getErrorCodes().contains(errorCode))) { + return remainRetries > 0 + || remainRetries == ContainerRetry.RETRY_FOREVER; + } + + return false; + } + + /** * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST */ static class KilledExternallyTransition extends ExitedWithFailureTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 89c71bb..1c13a07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -106,6 +106,8 @@ "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = + "/remainRetries"; private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; @@ -238,6 +240,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { rcs.capability = new ResourcePBImpl( ResourceProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { + rcs.remainRetries = Integer.parseInt(asString(entry.getValue())); } else { throw new IOException("Unexpected container state key: " + key); } @@ -321,6 +325,18 @@ public void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(remainRetries))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index d5dce9b..688517f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -104,6 +104,11 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode) } @Override + public void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException { + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index e8ccf54..187cb6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerRetry; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -76,6 +77,7 @@ public NMStateStoreService(String name) { String diagnostics = ""; StartContainerRequest startRequest; Resource capability; + int remainRetries = ContainerRetry.RETRY_INVALID; public RecoveredContainerStatus getStatus() { return status; @@ -100,6 +102,10 @@ public StartContainerRequest getStartRequest() { public Resource getCapability() { return capability; } + + public int getRemainRetries() { + return remainRetries; + } } public static class LocalResourceTrackerState { @@ -325,6 +331,15 @@ public abstract void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException; /** + * Record remain retry times for a container + * @param containerId the container ID + * @param remainRetries the remain retry times when container fails to run + * @throws IOException + */ + public abstract void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException; + + /** * Remove records corresponding to a container * @param containerId the container ID * @throws IOException diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index a1c95ab..079deb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -124,6 +124,7 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; + rcsCopy.remainRetries = rcs.remainRetries; result.add(rcsCopy); } return result; @@ -177,6 +178,13 @@ public synchronized void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainRetries(ContainerId containerId, + int remainRetries) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.remainRetries = remainRetries; + } + + @Override public synchronized void removeContainer(ContainerId containerId) throws IOException { containerStates.remove(containerId);