diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7887fbce25063981062dfc72faf0d57a85decc81..5d274cbf0dbffcc09159c60d2236f292654979ff 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2616,6 +2616,11 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY = 0;
+ public static final String APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB =
+ YARN_PREFIX + "app.attempt.diagnostics.capacity.kb";
+
+ public static final int DEFAULT_APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB = 64;
+
@Private
public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1e929a8c5007e12d9d8c428880d33ea0e40390bf..68f370f5bf86f5f79d45e00a5cf282b69396b999 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3014,4 +3014,19 @@
3000
+
+
+ Defines the capacity of the diagnostics message of an application
+ attempt, in kilobytes.
+ When using ZooKeeper to store application state behavior, it's
+ important to limit the size of the diagnostic messages to
+ prevent YARN from overwhelming ZooKeeper. In cases where
+ yarn.resourcemanager.state-store.max-completed-applications is set to
+ a large number, it may be desirable to reduce the value of this property
+ to limit the total data stored.
+
+ yarn.app.attempt.diagnostics.capacity.kb
+ 64
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 6985d652afff22394c9c683153dc7c9b6eaf04e8..830f84c3e8687943446dfcc8725ab6c7f80e9138 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -52,6 +52,15 @@
org.apache.hadoop
hadoop-annotations
+
+
+ junit
+ junit
+ test
+
org.mockito
mockito-all
@@ -73,11 +82,6 @@
protobuf-java
- junit
- junit
- test
-
-
commons-io
commons-io
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index ab84985edcbc2a6ee34de90ec9de2c1edfbfcf1d..e3d4b9fc553e73ca39d00f6a4b0037f407167ddd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -38,6 +38,7 @@
import javax.crypto.SecretKey;
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -171,7 +172,7 @@
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
private FinalApplicationStatus finalStatus = null;
- private final StringBuilder diagnostics = new StringBuilder();
+ private final BoundedAppender diagnostics;
private int amContainerExitStatus = ContainerExitStatus.INVALID;
private Configuration conf;
@@ -518,6 +519,42 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
this.amReq = amReq;
this.blacklistedNodesForAM = amBlacklistManager;
+
+ final int diagnosticsLimitKB = getDiagnosticsLimitKBOrThrow(conf);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB + " : " +
+ diagnosticsLimitKB);
+ }
+
+ this.diagnostics = new BoundedAppender(diagnosticsLimitKB * 1024);
+ }
+
+ private int getDiagnosticsLimitKBOrThrow(final Configuration configuration) {
+ try {
+ final int diagnosticsLimitKB = configuration.getInt(
+ YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB,
+ YarnConfiguration.DEFAULT_APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB);
+
+ if (diagnosticsLimitKB <= 0) {
+ final String message =
+ YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB + " ("
+ + diagnosticsLimitKB + ") should be positive.";
+ LOG.error(message);
+
+ throw new YarnRuntimeException(message);
+ }
+
+ return diagnosticsLimitKB;
+ } catch (final NumberFormatException nfe) {
+ final String message = String.format(
+ "%s should be a positive integer. Exception message: %s",
+ YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB,
+ nfe.getMessage());
+ LOG.error(message);
+
+ throw new YarnRuntimeException(message);
+ }
}
@Override
@@ -729,7 +766,7 @@ public int getAMRMTokenKeyId() {
public String getDiagnostics() {
this.readLock.lock();
try {
- if (diagnostics.length() == 0 && amLaunchDiagnostics != null) {
+ if (diagnostics.currentLength() == 0 && amLaunchDiagnostics != null) {
return amLaunchDiagnostics;
}
return this.diagnostics.toString();
@@ -926,8 +963,8 @@ public void recover(RMState state) {
attemptState.getState()));
}
- diagnostics.append("Attempt recovered after RM restart");
- diagnostics.append(attemptState.getDiagnostics());
+ appendToDiagnosticsSafely("Attempt recovered after RM restart");
+ appendToDiagnosticsSafely(attemptState.getDiagnostics());
this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
this.attemptMetrics.setIsPreempted();
@@ -942,12 +979,21 @@ public void recover(RMState state) {
this.startTime = attemptState.getStartTime();
this.finishTime = attemptState.getFinishTime();
this.attemptMetrics.updateAggregateAppResourceUsage(
- attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
+ attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
attemptState.getPreemptedMemorySeconds(),
attemptState.getPreemptedVcoreSeconds());
}
+ private void appendToDiagnosticsSafely(final String diagnosticsMessage) {
+ try {
+ diagnostics.append(diagnosticsMessage);
+ } catch (final IndexOutOfBoundsException | IllegalStateException e) {
+ LOG.warn("There was a problem appending a diagnostics message to an RM "
+ + "app attempt. Here is the message: %s", e);
+ }
+ }
+
public void transferStateFromAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainersReference();
this.finishedContainersSentToAM =
@@ -1463,7 +1509,7 @@ public AttemptFailedTransition() {
@Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
if (event.getDiagnosticMsg() != null) {
- appAttempt.diagnostics.append(event.getDiagnosticMsg());
+ appAttempt.appendToDiagnosticsSafely(event.getDiagnosticMsg());
}
super.transition(appAttempt, event);
}
@@ -1566,7 +1612,7 @@ public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
// Use diagnostic from launcher
- appAttempt.diagnostics.append(event.getDiagnosticMsg());
+ appAttempt.appendToDiagnosticsSafely(event.getDiagnosticMsg());
// Tell the app, scheduler
super.transition(appAttempt, event);
@@ -1655,8 +1701,7 @@ public void transition(RMAppAttemptImpl appAttempt,
private void setAMContainerCrashedDiagnosticsAndExitStatus(
RMAppAttemptContainerFinishedEvent finishEvent) {
ContainerStatus status = finishEvent.getContainerStatus();
- String diagnostics = getAMContainerCrashedDiagnostics(finishEvent);
- this.diagnostics.append(diagnostics);
+ appendToDiagnosticsSafely(getAMContainerCrashedDiagnostics(finishEvent));
this.amContainerExitStatus = status.getExitStatus();
}
@@ -1718,7 +1763,7 @@ public ExpiredTransition() {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.diagnostics.append(getAMExpiredDiagnostics(event));
+ appAttempt.appendToDiagnosticsSafely(getAMExpiredDiagnostics(event));
super.transition(appAttempt, event);
}
}
@@ -1740,7 +1785,8 @@ public UnexpectedAMRegisteredTransition() {
@Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
assert appAttempt.submissionContext.getUnmanagedAM();
- appAttempt.diagnostics.append(getUnexpectedAMRegisteredDiagnostics());
+ appAttempt
+ .appendToDiagnosticsSafely(getUnexpectedAMRegisteredDiagnostics());
super.transition(appAttempt, event);
}
@@ -1825,7 +1871,7 @@ private void updateInfoOnAMUnregister(RMAppAttemptEvent event) {
progress = 1.0f;
RMAppAttemptUnregistrationEvent unregisterEvent =
(RMAppAttemptUnregistrationEvent) event;
- diagnostics.append(unregisterEvent.getDiagnosticMsg());
+ appendToDiagnosticsSafely(unregisterEvent.getDiagnosticMsg());
originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
finalStatus = unregisterEvent.getFinalApplicationStatus();
}
@@ -2232,4 +2278,57 @@ public void setRecoveredFinalState(RMAppAttemptState finalState) {
}
return Collections.EMPTY_SET;
}
+
+ @VisibleForTesting
+ static class BoundedAppender {
+ @VisibleForTesting
+ static final String TRUNCATED_MESSAGES_TEMPLATE =
+ "Diagnostic messages truncated, showing last "
+ + "(%d) chars out of (%d) in total:%n...%s";
+
+ private final int limit;
+ private final StringBuilder messages = new StringBuilder();
+ private int totalCharacterCount = 0;
+
+ BoundedAppender(final int limit) {
+ Preconditions.checkArgument(limit > 0, "limit should be positive");
+
+ this.limit = limit;
+ }
+
+ BoundedAppender append(final CharSequence csq) {
+ appendAndCount(csq);
+ checkAndCut();
+
+ return this;
+ }
+
+ private void appendAndCount(final CharSequence csq) {
+ final int before = messages.length();
+ messages.append(csq);
+ final int after = messages.length();
+ totalCharacterCount += after - before;
+ }
+
+ private void checkAndCut() {
+ if (messages.length() > limit) {
+ final int newStart = messages.length() - limit;
+ messages.delete(0, newStart);
+ }
+ }
+
+ int currentLength() {
+ return messages.length();
+ }
+
+ @Override
+ public String toString() {
+ if (messages.length() < totalCharacterCount) {
+ return String.format(TRUNCATED_MESSAGES_TEMPLATE, messages.length(),
+ totalCharacterCount, messages.toString());
+ }
+
+ return messages.toString();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/BoundedAppenderTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/BoundedAppenderTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..bd4f9b36bd9099fbfc57c0260024e15afb8dd0e4
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/BoundedAppenderTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.BoundedAppender;
+
+public class BoundedAppenderTest {
+ @Rule
+ public ExpectedException expected = ExpectedException.none();
+
+ @Test
+ public void initWithZeroLimitThrowsException() {
+ expected.expect(IllegalArgumentException.class);
+ expected.expectMessage("limit should be positive");
+
+ new BoundedAppender(0);
+ }
+
+ @Test
+ public void nullAppendedNullStringRead() {
+ final BoundedAppender boundedAppender = new BoundedAppender(4);
+ boundedAppender.append(null);
+
+ assertEquals("null appended, \"null\" read", "null",
+ boundedAppender.toString());
+ }
+
+ @Test
+ public void appendBelowLimitOnceValueIsReadCorrectly() {
+ final BoundedAppender boundedAppender = new BoundedAppender(2);
+
+ boundedAppender.append("ab");
+
+ assertEquals("value appended is read correctly", "ab",
+ boundedAppender.toString());
+ }
+
+ @Test
+ public void appendValuesBelowLimitAreReadCorrectlyInFifoOrder() {
+ final BoundedAppender boundedAppender = new BoundedAppender(3);
+
+ boundedAppender.append("ab");
+ boundedAppender.append("cd");
+ boundedAppender.append("e");
+ boundedAppender.append("fg");
+
+ assertEquals("last values appended fitting limit are read correctly",
+ String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 7, "efg"),
+ boundedAppender.toString());
+ }
+
+ @Test
+ public void appendLastAboveLimitPreservesLastMessagePostfix() {
+ final BoundedAppender boundedAppender = new BoundedAppender(3);
+
+ boundedAppender.append("ab");
+ boundedAppender.append("cde");
+ boundedAppender.append("fghij");
+
+ assertEquals(
+ "last value appended above limit postfix is read correctly", String
+ .format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 10, "hij"),
+ boundedAppender.toString());
+ }
+
+ @Test
+ public void appendMiddleAboveLimitPreservesLastMessageAndMiddlePostfix() {
+ final BoundedAppender boundedAppender = new BoundedAppender(3);
+
+ boundedAppender.append("ab");
+ boundedAppender.append("cde");
+
+ assertEquals("last value appended above limit postfix is read correctly",
+ String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 5, "cde"),
+ boundedAppender.toString());
+
+ boundedAppender.append("fg");
+
+ assertEquals(
+ "middle value appended above limit postfix and last value are "
+ + "read correctly",
+ String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 7, "efg"),
+ boundedAppender.toString());
+
+ boundedAppender.append("hijkl");
+
+ assertEquals(
+ "last value appended above limit postfix is read correctly", String
+ .format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 12, "jkl"),
+ boundedAppender.toString());
+ }
+}
\ No newline at end of file