diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 932945b..ed64b54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -46,6 +47,7 @@ *
  • Optional, application-specific binary service data.
  • *
  • Environment variables for the launched process.
  • *
  • Command to launch the container.
  • + *
  • Retry strategy when container fails to run.
  • * * * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) @@ -61,6 +63,18 @@ public static ContainerLaunchContext newInstance( Map environment, List commands, Map serviceData, ByteBuffer tokens, Map acls) { + return newInstance(localResources, environment, commands, serviceData, + tokens, acls, null); + } + + @Public + @Unstable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, + ContainerRetryContext containerRetryContext) { ContainerLaunchContext container = Records.newRecord(ContainerLaunchContext.class); container.setLocalResources(localResources); @@ -69,6 +83,7 @@ public static ContainerLaunchContext newInstance( container.setServiceData(serviceData); container.setTokens(tokens); container.setApplicationACLs(acls); + container.setContainerRetryContext(containerRetryContext); return container; } @@ -195,4 +210,22 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get the ContainerRetryContext to relaunch container. + * @return ContainerRetryContext to relaunch container. + */ + @Public + @Unstable + public abstract ContainerRetryContext getContainerRetryContext(); + + /** + * Set the ContainerRetryContext to relaunch container. + * @param containerRetryContext ContainerRetryContext to + * relaunch container. + */ + @Public + @Unstable + public abstract void setContainerRetryContext( + ContainerRetryContext containerRetryContext); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java new file mode 100644 index 0000000..f257967 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Set; + +/** + * {@code ContainerRetryContext} indicates how container retry after it fails + * to run. + *

    + * It provides details such as: + *

      + *
    • + * {@link ContainerRetryPolicy} : + * - NEVER_RETRY(DEFAULT value): no matter what error code is when container + * fails to run, just do not retry. + * - ALWAYS_RETRY: no matter what error code is, when container fails to + * run, just retry. + * - RETRY_ON_SPECIFIC_ERROR_CODE: when container fails to run, do retry if + * the error code is one of errorCodes, otherwise do not retry. + * + * Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry + * because it is usually killed on purpose. + *
    • + *
    • + * maxRetries specifies how many times to retry if need to retry. + * If the value is -1, it means retry forever. + *
    • + *
    • retryInterval specifies delaying some time before relaunch + * container, the unit is millisecond.
    • + *
    + */ +@Public +@Unstable +public abstract class ContainerRetryContext { + public static final int RETRY_FOREVER = -1; + public static final int RETRY_INVALID = -1000; + public static final ContainerRetryContext NEVER_RETRY_CONTEXT = + newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0); + + @Private + @Unstable + public static ContainerRetryContext newInstance( + ContainerRetryPolicy retryPolicy, Set errorCodes, + int maxRetries, int retryInterval) { + ContainerRetryContext containerRetryContext = + Records.newRecord(ContainerRetryContext.class); + containerRetryContext.setRetryPolicy(retryPolicy); + containerRetryContext.setErrorCodes(errorCodes); + containerRetryContext.setMaxRetries(maxRetries); + containerRetryContext.setRetryInterval(retryInterval); + return containerRetryContext; + } + + public abstract ContainerRetryPolicy getRetryPolicy(); + public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); + public abstract Set getErrorCodes(); + public abstract void setErrorCodes(Set errorCodes); + public abstract int getMaxRetries(); + public abstract void setMaxRetries(int maxRetries); + public abstract int getRetryInterval(); + public abstract void setRetryInterval(int retryInterval); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java new file mode 100644 index 0000000..a3816c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

    Retry policy for relaunching a Container.

    + */ +@Public +@Unstable +public enum ContainerRetryPolicy { + /** Never retry. */ + NEVER_RETRY, + /** Always retry. */ + ALWAYS_RETRY, + /** Retry for specific error codes. */ + RETRY_ON_SPECIFIC_ERROR_CODE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d122f5a..ace442d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -487,6 +487,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional ContainerRetryContextProto container_retry_context = 7; } message ContainerStatusProto { @@ -510,6 +511,19 @@ message ContainerResourceChangeRequestProto { optional ResourceProto capability = 2; } +message ContainerRetryContextProto { + optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; + repeated int32 error_codes = 2; + optional int32 max_retries = 3 [default = 0]; + optional int32 retry_interval = 4 [default = 0]; +} + +enum ContainerRetryPolicyProto { + NEVER_RETRY = 0; + ALWAYS_RETRY = 1; + RETRY_ON_SPECIFIC_ERROR_CODE = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f410c43..f61059c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -76,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -253,6 +256,13 @@ // File length needed for local resource private long shellScriptPathLen = 0; + // Container retry options + private ContainerRetryPolicy containerRetryPolicy = + ContainerRetryPolicy.NEVER_RETRY; + private Set errorCodes = null; + private int maxRetries = 0; + private int retryInterval = 0; + // Timeline domain ID private String domainId = null; @@ -371,6 +381,17 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("max_retries", true, "If container could retry, it specifies" + + " max retires"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is milliseconds"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -508,6 +529,19 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + + containerRetryPolicy = ContainerRetryPolicy.values()[ + Integer.parseInt(cliParser.getOptionValue("retry_policy", "0"))]; + if (cliParser.hasOption("error_codes")) { + errorCodes = new HashSet<>(); + for (String errorCode : + cliParser.getOptionValue("error_codes").split(",")) { + errorCodes.add(Integer.parseInt(errorCode)); + } + } + maxRetries = Integer.parseInt(cliParser.getOptionValue("max_retries", "0")); + retryInterval = Integer.parseInt(cliParser.getOptionValue( + "retry_interval", "0")); return true; } @@ -1062,9 +1096,11 @@ public void run() { // "hadoop dfs" command inside the distributed shell. Map myShellEnv = new HashMap(shellEnv); myShellEnv.put(YARN_SHELL_ID, shellId); + ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance( + containerRetryPolicy, errorCodes, maxRetries, retryInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), - null); + null, containerRetryContext); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 68d2bde..15df60b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -168,6 +168,8 @@ private long attemptFailuresValidityInterval = -1; + private Vector containerRetryOptions = new Vector<>(5); + // Debug flag boolean debugFlag = false; @@ -287,6 +289,17 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("max_retries", true, "If container could retry, it specifies" + + " max retries"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is milliseconds"); } /** @@ -429,6 +442,24 @@ public boolean init(String[] args) throws ParseException { } } + // Get container retry options + if (cliParser.hasOption("retry_policy")) { + containerRetryOptions.add("--retry_policy " + + cliParser.getOptionValue("retry_policy")); + } + if (cliParser.hasOption("error_codes")) { + containerRetryOptions.add("--error_codes " + + cliParser.getOptionValue("error_codes")); + } + if (cliParser.hasOption("max_retries")) { + containerRetryOptions.add("--max_retries " + + cliParser.getOptionValue("max_retries")); + } + if (cliParser.hasOption("retry_interval")) { + containerRetryOptions.add("--retry_interval " + + cliParser.getOptionValue("retry_interval")); + } + return true; } @@ -638,6 +669,8 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } + vargs.addAll(containerRetryOptions); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..37eea46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; @@ -56,7 +58,8 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; - + private ContainerRetryContext containerRetryContext = null; + public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); } @@ -120,6 +123,10 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.containerRetryContext != null) { + builder.setContainerRetryContext( + convertToProtoFormat(this.containerRetryContext)); + } } private void mergeLocalToProto() { @@ -456,6 +463,27 @@ public void setApplicationACLs( this.applicationACLS.putAll(appACLs); } + public ContainerRetryContext getContainerRetryContext() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerRetryContext != null) { + return this.containerRetryContext; + } + if (!p.hasContainerRetryContext()) { + return null; + } + this.containerRetryContext = convertFromProtoFormat( + p.getContainerRetryContext()); + return this.containerRetryContext; + } + + public void setContainerRetryContext(ContainerRetryContext retryContext) { + maybeInitBuilder(); + if (retryContext == null) { + builder.clearContainerRetryContext(); + } + this.containerRetryContext = retryContext; + } + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); } @@ -463,4 +491,14 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } -} + + private ContainerRetryContextPBImpl convertFromProtoFormat( + ContainerRetryContextProto p) { + return new ContainerRetryContextPBImpl(p); + } + + private ContainerRetryContextProto convertToProtoFormat( + ContainerRetryContext t) { + return ((ContainerRetryContextPBImpl)t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java new file mode 100644 index 0000000..a5ef70d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder; + +import java.util.HashSet; +import java.util.Set; + +/** + * Implementation of ContainerRetryContext. + */ +public class ContainerRetryContextPBImpl extends ContainerRetryContext { + private ContainerRetryContextProto proto = + ContainerRetryContextProto.getDefaultInstance(); + private ContainerRetryContextProto.Builder builder = null; + private boolean viaProto = false; + + private Set errorCodes = null; + + public ContainerRetryContextPBImpl() { + builder = ContainerRetryContextProto.newBuilder(); + } + + public ContainerRetryContextPBImpl(ContainerRetryContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public ContainerRetryContextProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.errorCodes != null) { + builder.clearErrorCodes(); + builder.addAllErrorCodes(this.errorCodes); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerRetryContextProto.newBuilder(proto); + } + viaProto = false; + } + + public ContainerRetryPolicy getRetryPolicy() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryPolicy()) { + return ContainerRetryPolicy.NEVER_RETRY; + } + return convertFromProtoFormat(p.getRetryPolicy()); + } + + public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) { + maybeInitBuilder(); + if (containerRetryPolicy == null) { + builder.clearRetryPolicy(); + return; + } + builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy)); + } + + private void initErrorCodes() { + if (this.errorCodes != null) { + return; + } + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + this.errorCodes = new HashSet<>(); + this.errorCodes.addAll(p.getErrorCodesList()); + } + + public Set getErrorCodes() { + initErrorCodes(); + return this.errorCodes; + } + + public void setErrorCodes(Set errCodes) { + maybeInitBuilder(); + if (errCodes == null || errCodes.isEmpty()) { + builder.clearErrorCodes(); + } + this.errorCodes = errCodes; + } + + public int getMaxRetries() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaxRetries()) { + return 0; + } + return p.getMaxRetries(); + } + + public void setMaxRetries(int maxRetries) { + maybeInitBuilder(); + builder.setMaxRetries(maxRetries); + } + + public int getRetryInterval() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryInterval()) { + return 0; + } + return p.getRetryInterval(); + } + + public void setRetryInterval(int retryInterval) { + maybeInitBuilder(); + builder.setRetryInterval(retryInterval); + } + + private ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy containerRetryPolicy) { + return ProtoUtils.convertToProtoFormat(containerRetryPolicy); + } + + private ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto containerRetryPolicyProto) { + return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 29ed0f3..d3d5948 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; @@ -294,4 +296,17 @@ public static ExecutionTypeProto convertToProtoFormat(ExecutionType e) { public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) { return ExecutionType.valueOf(e.name()); } + + /* + * ContainerRetryPolicy + */ + public static ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy e) { + return ContainerRetryPolicyProto.valueOf(e.name()); + } + + public static ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto e) { + return ContainerRetryPolicy.valueOf(e.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index b7f5ff7..96df10b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -159,6 +160,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -192,6 +194,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -345,7 +348,7 @@ private static Object genTypeValue(Type type) { return rand.nextBoolean(); } else if (type.equals(byte.class)) { return bytes[rand.nextInt(4)]; - } else if (type.equals(int.class)) { + } else if (type.equals(int.class) || type.equals(Integer.class)) { return rand.nextInt(1000000); } else if (type.equals(long.class)) { return Long.valueOf(rand.nextInt(1000000)); @@ -469,6 +472,7 @@ public static void setup() throws Exception { generateByNewInstance(ApplicationResourceUsageReport.class); generateByNewInstance(ApplicationReport.class); generateByNewInstance(Container.class); + generateByNewInstance(ContainerRetryContext.class); generateByNewInstance(ContainerLaunchContext.class); generateByNewInstance(ApplicationSubmissionContext.class); generateByNewInstance(ContainerReport.class); @@ -957,6 +961,12 @@ public void testContainerIdPBImpl() throws Exception { } @Test + public void testContainerRetryPBImpl() throws Exception { + validatePBImplRecord(ContainerRetryContextPBImpl.class, + ContainerRetryContextProto.class); + } + + @Test public void testContainerLaunchContextPBImpl() throws Exception { validatePBImplRecord(ContainerLaunchContextPBImpl.class, ContainerLaunchContextProto.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 8c74bf5..d08ee67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -308,6 +308,7 @@ public void writeLaunchEnv(OutputStream out, } public enum ExitCode { + SUCCESS(0), FORCE_KILLED(137), TERMINATED(143), LOST(154); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 5cc4e19..976df7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -551,6 +551,10 @@ public Path getLocalPathForWrite(String pathStr, long size, checkWrite); } + public Path getLocalPathForRead(String pathStr) throws IOException { + return getPathToRead(pathStr, getLocalDirsForRead()); + } + public Path getLogPathForWrite(String pathStr, boolean checkWrite) throws IOException { return logDirsAllocator.getLocalPathForWrite(pathStr, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index f44de59..fb37a06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -349,7 +349,8 @@ private void recoverContainer(RecoveredContainerState rcs) Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability()); + rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), + rcs.getRemainingRetryAttempts()); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 1d2ec56..310d8df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -55,6 +55,10 @@ NMContainerStatus getNMContainerStatus(); + boolean shouldRetry(int errorCode); + + boolean isRelaunch(); + String toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index e16ea93..60da9f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -100,6 +103,10 @@ private long containerLocalizationStartTime; private long containerLaunchStartTime; private static Clock clock = new SystemClock(); + private final ContainerRetryContext containerRetryContext; + // remaining retries to relaunch container if needed + private int remainingRetryAttempts; + private boolean isRelaunch = false; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -134,6 +141,13 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.dispatcher = dispatcher; this.stateStore = stateStore; this.launchContext = launchContext; + if (launchContext != null + && launchContext.getContainerRetryContext() != null) { + this.containerRetryContext = launchContext.getContainerRetryContext(); + } else { + this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT; + } + this.remainingRetryAttempts = containerRetryContext.getMaxRetries(); this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); this.resource = containerTokenIdentifier.getResource(); @@ -154,7 +168,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled, Resource recoveredCapability) { + String diagnostics, boolean wasKilled, Resource recoveredCapability, + int remainingRetryAttempts) { this(conf, dispatcher, stateStore, launchContext, creds, metrics, containerTokenIdentifier); this.recoveredStatus = recoveredStatus; @@ -167,6 +182,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.resource = Resource.newInstance(recoveredCapability.getMemory(), recoveredCapability.getVirtualCores()); } + if (remainingRetryAttempts != ContainerRetryContext.RETRY_INVALID) { + this.remainingRetryAttempts = remainingRetryAttempts; + } } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -246,9 +264,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, - ContainerState.EXITED_WITH_FAILURE, + EnumSet.of(ContainerState.EXITED_WITH_FAILURE, + ContainerState.LOCALIZED), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition(true)) + new RetryFailureTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) @@ -854,6 +873,86 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** + * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * CONTAINER_EXITED_WITH_FAILURE state. + **/ + @SuppressWarnings("unchecked") // dispatcher not typed + static class RetryFailureTransition implements + MultipleArcTransition { + + @Override + public ContainerState transition(final ContainerImpl container, + ContainerEvent event) { + ContainerExitEvent exitEvent = (ContainerExitEvent) event; + container.exitCode = exitEvent.getExitCode(); + if (exitEvent.getDiagnosticInfo() != null) { + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); + } + + if (container.shouldRetry(container.exitCode)) { + if (container.remainingRetryAttempts > 0) { + container.remainingRetryAttempts--; + try { + container.stateStore.storeContainerRemainingRetryAttempts( + container.getContainerId(), container.remainingRetryAttempts); + } catch (IOException e) { + LOG.warn("Unable to update remainingRetryAttempts in state store for " + + container.getContainerId(), e); + } + } + LOG.info("Relaunch Container " + container.getContainerId() + + ". Remain retry attempts : " + container.remainingRetryAttempts + + ". Retry interval is " + + container.containerRetryContext.getRetryInterval() + "ms"); + container.isRelaunch = true; + container.wasLaunched = false; + container.metrics.endRunningContainer(); + // wait for some time, then relaunch + new Thread() { + @Override + public void run() { + try { + Thread.sleep(container.containerRetryContext.getRetryInterval()); + container.sendLaunchEvent(); + } catch (InterruptedException e) { + return; + } + } + }.start(); + return ContainerState.LOCALIZED; + } else { + new ExitedWithFailureTransition(true).transition(container, event); + return ContainerState.EXITED_WITH_FAILURE; + } + } + } + + @Override + public boolean shouldRetry(int errorCode) { + if (errorCode == ExitCode.SUCCESS.getExitCode() + || errorCode == ExitCode.FORCE_KILLED.getExitCode() + || errorCode == ExitCode.TERMINATED.getExitCode()) { + return false; + } + + ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy(); + if (retryPolicy == ContainerRetryPolicy.ALWAYS_RETRY + || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE + && containerRetryContext.getErrorCodes() != null + && containerRetryContext.getErrorCodes().contains(errorCode))) { + return remainingRetryAttempts > 0 + || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; + } + + return false; + } + + @Override + public boolean isRelaunch() { + return isRelaunch; + } + + /** * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST */ static class KilledExternallyTransition extends ExitedWithFailureTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 6371b21..0b48abd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -177,6 +178,10 @@ public Integer call() { Path containerLogDir; try { + // Select the working directory for the container + Path containerWorkDir = getContainerWorkDir(container); + cleanupContainerFilesForRelaunch(container, containerWorkDir); + localResources = container.getLocalizedResources(); if (localResources == null) { throw RPCUtil.getRemoteException( @@ -191,8 +196,7 @@ public Integer call() { String appIdStr = app.getAppId().toString(); String relativeContainerLogDir = ContainerLaunch .getRelativeContainerLogDir(appIdStr, containerIdStr); - containerLogDir = - dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); + containerLogDir = getContainerLogDir(container); for (String str : command) { // TODO: Should we instead work via symlinks without this grammar? newCmds.add(expandEnvironment(str, containerLogDir)); @@ -226,14 +230,6 @@ public Integer call() { DataOutputStream containerScriptOutStream = null; DataOutputStream tokensOutStream = null; - // Select the working directory for the container - Path containerWorkDir = - dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE - + Path.SEPARATOR + user + Path.SEPARATOR - + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr - + Path.SEPARATOR + containerIdStr, - LocalDirAllocator.SIZE_UNKNOWN, false); - String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not @@ -329,7 +325,9 @@ public Integer call() { completed.set(true); exec.deactivateContainer(containerID); try { - context.getNMStateStore().storeContainerCompleted(containerID, ret); + if (!container.shouldRetry(ret)) { + context.getNMStateStore().storeContainerCompleted(containerID, ret); + } } catch (IOException e) { LOG.error("Unable to set exit code for container " + containerID); } @@ -369,6 +367,120 @@ public Integer call() { } /** + * Get container's working directory. + * If container is in relaunch and we find its previous working directory + * could be read/write it will be reused as current working directory. + * We apply a simple heuristic to find its previous working directory: + * if a good work dir with the container tokens file already exists, + * it means it is container's previous working directory. + */ + private Path getContainerWorkDir(Container container1) throws IOException { + Path containerWorkDir = null; + String relativeContainerWorkDir = ContainerLocalizer.USERCACHE + + Path.SEPARATOR + container1.getUser() + Path.SEPARATOR + + ContainerLocalizer.APPCACHE + Path.SEPARATOR + + app.getAppId().toString() + Path.SEPARATOR + + ConverterUtils.toString(container1.getContainerId()); + + if (container1.isRelaunch()) { + try { + // container's tokens file's parent is container's previous working dir + containerWorkDir = dirsHandler.getLocalPathForRead( + relativeContainerWorkDir + Path.SEPARATOR + + ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).getParent(); + // check whether containerWorDir could be read/write, it will throw + // IOException if not. + DiskChecker.checkDir( + new File(containerWorkDir.toUri().getPath())); + } catch (IOException e) { + containerWorkDir = null; + } + } + if (containerWorkDir == null) { + containerWorkDir = + dirsHandler.getLocalPathForWrite(relativeContainerWorkDir, + LocalDirAllocator.SIZE_UNKNOWN, false); + } + + return containerWorkDir; + } + + /** + * Get container's log directory. + * If container is in relaunch and we find its previous log directory + * could be read/write it will be reused as current log directory. + * We apply a simple heuristic to find its previous working directory: + * if a good work dir with file 'stdout' already exists, it means it + * is container's previous working directory. We assume there is 'stdout' + * in most containers's log directory. + */ + private Path getContainerLogDir(Container container1) throws IOException { + Path containerLogDir = null; + String relativeContainerLogDir = ContainerLaunch + .getRelativeContainerLogDir(app.getAppId().toString(), + ConverterUtils.toString(container1.getContainerId())); + + if (container1.isRelaunch()) { + try { + containerLogDir = dirsHandler.getLocalPathForRead( + relativeContainerLogDir + Path.SEPARATOR + + "stdout").getParent(); + // check whether containerWorDir could be read/write, it will throw + // IOException if not. + DiskChecker.checkDir( + new File(containerLogDir.toUri().getPath())); + } catch (IOException e) { + containerLogDir = null; + } + } + if (containerLogDir == null) { + containerLogDir = + dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); + } + + return containerLogDir; + } + + /** + * Clean up container's previous files for container relaunch. + */ + private void cleanupContainerFilesForRelaunch(Container container1, + Path containerWorkDir) { + if (!container1.isRelaunch()) { + return; + } + + try { + FileContext lfs = FileContext.getLocalFSFileContext(); + // delete ContainerScriptPath + lfs.delete(new Path(containerWorkDir, CONTAINER_SCRIPT), false); + // delete TokensPath + lfs.delete( + new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE), false); + } catch (IOException e) { + LOG.error("Failed to delete container script file and token file", e); + } + + String appIdStr = app.getAppId().toString(); + String containerIdStr = + ConverterUtils.toString(container1.getContainerId()); + // delete pid file and pid exit code file + String pidPath = getPidFileSubpath(appIdStr, containerIdStr); + deleteLocalPath(pidPath); + deleteLocalPath(getExitCodeFile(pidPath)); + } + + private void deleteLocalPath(String relativePath) { + try { + FileContext lfs = FileContext.getLocalFSFileContext(); + Path path = dirsHandler.getLocalPathForRead(relativePath); + lfs.delete(path, false); + } catch (IOException e) { + LOG.error("Failed to delete " + relativePath, e); + } + } + + /** * Tries to tail and fetch TAIL_SIZE_IN_BYTES of data from the error log. * ErrorLog filename is not fixed and depends upon app, hence file name * pattern is used. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 89c71bb..db84485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -106,6 +106,8 @@ "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = + "/remainingRetryAttempts"; private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; @@ -238,6 +240,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { rcs.capability = new ResourcePBImpl( ResourceProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { + rcs.setRemainingRetryAttempts(Integer.parseInt(asString(entry.getValue()))); } else { throw new IOException("Unexpected container state key: " + key); } @@ -321,6 +325,18 @@ public void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index d5dce9b..68f4e87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -104,6 +104,11 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode) } @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index e8ccf54..b733ffe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -76,6 +77,7 @@ public NMStateStoreService(String name) { String diagnostics = ""; StartContainerRequest startRequest; Resource capability; + private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; public RecoveredContainerStatus getStatus() { return status; @@ -100,6 +102,14 @@ public StartContainerRequest getStartRequest() { public Resource getCapability() { return capability; } + + public int getRemainingRetryAttempts() { + return remainingRetryAttempts; + } + + public void setRemainingRetryAttempts(int retryAttempts) { + this.remainingRetryAttempts = retryAttempts; + } } public static class LocalResourceTrackerState { @@ -325,6 +335,15 @@ public abstract void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException; /** + * Record remaining retry attempts for a container. + * @param containerId the container ID + * @param remainingRetryAttempts the remain retry times when container fails to run + * @throws IOException + */ + public abstract void storeContainerRemainingRetryAttempts( + ContainerId containerId, int remainingRetryAttempts) throws IOException; + + /** * Remove records corresponding to a container * @param containerId the container ID * @throws IOException diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 2ab9842..7a26df5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -645,6 +647,66 @@ public void testLaunchAfterKillRequest() throws Exception { } } + @Test + public void testContainerRetry() throws Exception{ + ContainerRetryContext containerRetryContext1 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.NEVER_RETRY, null, 3, 0); + testContainerRetry(containerRetryContext1, 2, 0); + + ContainerRetryContext containerRetryContext2 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); + testContainerRetry(containerRetryContext2, 2, 3); + + ContainerRetryContext containerRetryContext3 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); + // If exit code is 0, it will not retry + testContainerRetry(containerRetryContext3, 0, 0); + + ContainerRetryContext containerRetryContext4 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, null, 3, 0); + testContainerRetry(containerRetryContext4, 2, 0); + + HashSet errorCodes = new HashSet<>(); + errorCodes.add(2); + errorCodes.add(6); + ContainerRetryContext containerRetryContext5 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes, 3, 0); + testContainerRetry(containerRetryContext5, 2, 3); + + HashSet errorCodes2 = new HashSet<>(); + errorCodes.add(143); + ContainerRetryContext containerRetryContext6 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes2, 3, 0); + // If exit code is 143(SIGTERM), it will not retry even it is in errorCodes. + testContainerRetry(containerRetryContext5, 143, 0); + } + + private void testContainerRetry(ContainerRetryContext containerRetryContext, int exitCode, + int expectedRetries) throws Exception{ + WrappedContainer wc = null; + try { + int retryTimes = 0; + wc = new WrappedContainer(24, 314159265358979L, 4344, "yak", + containerRetryContext); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + while (true) { + wc.containerFailed(exitCode); + if (wc.c.getContainerState() == ContainerState.RUNNING) { + retryTimes ++; + } else { + break; + } + } + Assert.assertEquals(expectedRetries, retryTimes); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -782,12 +844,23 @@ public boolean matches(Object o) { WrappedContainer(int appId, long timestamp, int id, String user) throws IOException { - this(appId, timestamp, id, user, true, false); + this(appId, timestamp, id, user, null); + } + + WrappedContainer(int appId, long timestamp, int id, String user, + ContainerRetryContext containerRetryContext) throws IOException { + this(appId, timestamp, id, user, true, false, containerRetryContext); } - @SuppressWarnings("rawtypes") WrappedContainer(int appId, long timestamp, int id, String user, boolean withLocalRes, boolean withServiceData) throws IOException { + this(appId, timestamp, id, user, withLocalRes, withServiceData, null); + } + + @SuppressWarnings("rawtypes") + WrappedContainer(int appId, long timestamp, int id, String user, + boolean withLocalRes, boolean withServiceData, + ContainerRetryContext containerRetryContext) throws IOException { dispatcher = new DrainDispatcher(); dispatcher.init(new Configuration()); @@ -864,6 +937,8 @@ public boolean matches(Object o) { } when(ctxt.getServiceData()).thenReturn(serviceData); + when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext); + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), ctxt, null, metrics, identifier); dispatcher.register(ContainerEventType.class, @@ -984,6 +1059,10 @@ public void containerFailed(int exitCode) { assert containerStatus.getDiagnostics().contains(diagnosticMsg); assert containerStatus.getExitStatus() == exitCode; drainDispatcherEvents(); + // If container needs retry, relaunch it + if (c.getContainerState() == ContainerState.LOCALIZED) { + launchContainer(); + } } public void killContainer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index a1c95ab..81ecf65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -124,6 +124,7 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; + rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); result.add(rcsCopy); } return result; @@ -177,6 +178,13 @@ public synchronized void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setRemainingRetryAttempts(remainingRetryAttempts); + } + + @Override public synchronized void removeContainer(ContainerId containerId) throws IOException { containerStates.remove(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 08b49e7..fe70f10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -337,6 +337,13 @@ public void testContainerStorage() throws IOException { assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + // store remainRetries + stateStore.storeContainerRemainingRetryAttempts(containerId, 6); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + assertEquals(6, recoveredContainers.get(0).getRemainingRetryAttempts()); + // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 394a92c..ba52a97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -144,4 +144,14 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() { public NMContainerStatus getNMContainerStatus() { return null; } + + @Override + public boolean shouldRetry(int errorCode) { + return false; + } + + @Override + public boolean isRelaunch() { + return false; + } }