diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7887fbce25063981062dfc72faf0d57a85decc81..5d274cbf0dbffcc09159c60d2236f292654979ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2616,6 +2616,11 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY = 0; + public static final String APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB = + YARN_PREFIX + "app.attempt.diagnostics.capacity.kb"; + + public static final int DEFAULT_APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB = 64; + @Private public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1e929a8c5007e12d9d8c428880d33ea0e40390bf..68f370f5bf86f5f79d45e00a5cf282b69396b999 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3014,4 +3014,19 @@ 3000 + + + Defines the capacity of the diagnostics message of an application + attempt, in kilobytes. + When using ZooKeeper to store application state behavior, it's + important to limit the size of the diagnostic messages to + prevent YARN from overwhelming ZooKeeper. In cases where + yarn.resourcemanager.state-store.max-completed-applications is set to + a large number, it may be desirable to reduce the value of this property + to limit the total data stored. + + yarn.app.attempt.diagnostics.capacity.kb + 64 + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 6985d652afff22394c9c683153dc7c9b6eaf04e8..0a85d0c624b54443ddd6173c20a5f5cd25f16ac7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -52,8 +52,17 @@ org.apache.hadoop hadoop-annotations + - org.mockito + junit + junit + test + + + org.mockito mockito-all test @@ -73,11 +82,6 @@ protobuf-java - junit - junit - test - - commons-io commons-io diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index ab84985edcbc2a6ee34de90ec9de2c1edfbfcf1d..e3d4b9fc553e73ca39d00f6a4b0037f407167ddd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -38,6 +38,7 @@ import javax.crypto.SecretKey; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -171,7 +172,7 @@ // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs private FinalApplicationStatus finalStatus = null; - private final StringBuilder diagnostics = new StringBuilder(); + private final BoundedAppender diagnostics; private int amContainerExitStatus = ContainerExitStatus.INVALID; private Configuration conf; @@ -518,6 +519,42 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.amReq = amReq; this.blacklistedNodesForAM = amBlacklistManager; + + final int diagnosticsLimitKB = getDiagnosticsLimitKBOrThrow(conf); + + if (LOG.isDebugEnabled()) { + LOG.debug(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB + " : " + + diagnosticsLimitKB); + } + + this.diagnostics = new BoundedAppender(diagnosticsLimitKB * 1024); + } + + private int getDiagnosticsLimitKBOrThrow(final Configuration configuration) { + try { + final int diagnosticsLimitKB = configuration.getInt( + YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB, + YarnConfiguration.DEFAULT_APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB); + + if (diagnosticsLimitKB <= 0) { + final String message = + YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB + " (" + + diagnosticsLimitKB + ") should be positive."; + LOG.error(message); + + throw new YarnRuntimeException(message); + } + + return diagnosticsLimitKB; + } catch (final NumberFormatException nfe) { + final String message = String.format( + "%s should be a positive integer. Exception message: %s", + YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_CAPACITY_KB, + nfe.getMessage()); + LOG.error(message); + + throw new YarnRuntimeException(message); + } } @Override @@ -729,7 +766,7 @@ public int getAMRMTokenKeyId() { public String getDiagnostics() { this.readLock.lock(); try { - if (diagnostics.length() == 0 && amLaunchDiagnostics != null) { + if (diagnostics.currentLength() == 0 && amLaunchDiagnostics != null) { return amLaunchDiagnostics; } return this.diagnostics.toString(); @@ -926,8 +963,8 @@ public void recover(RMState state) { attemptState.getState())); } - diagnostics.append("Attempt recovered after RM restart"); - diagnostics.append(attemptState.getDiagnostics()); + appendToDiagnosticsSafely("Attempt recovered after RM restart"); + appendToDiagnosticsSafely(attemptState.getDiagnostics()); this.amContainerExitStatus = attemptState.getAMContainerExitStatus(); if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) { this.attemptMetrics.setIsPreempted(); @@ -942,12 +979,21 @@ public void recover(RMState state) { this.startTime = attemptState.getStartTime(); this.finishTime = attemptState.getFinishTime(); this.attemptMetrics.updateAggregateAppResourceUsage( - attemptState.getMemorySeconds(),attemptState.getVcoreSeconds()); + attemptState.getMemorySeconds(), attemptState.getVcoreSeconds()); this.attemptMetrics.updateAggregatePreemptedAppResourceUsage( attemptState.getPreemptedMemorySeconds(), attemptState.getPreemptedVcoreSeconds()); } + private void appendToDiagnosticsSafely(final String diagnosticsMessage) { + try { + diagnostics.append(diagnosticsMessage); + } catch (final IndexOutOfBoundsException | IllegalStateException e) { + LOG.warn("There was a problem appending a diagnostics message to an RM " + + "app attempt. Here is the message: %s", e); + } + } + public void transferStateFromAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainersReference(); this.finishedContainersSentToAM = @@ -1463,7 +1509,7 @@ public AttemptFailedTransition() { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { if (event.getDiagnosticMsg() != null) { - appAttempt.diagnostics.append(event.getDiagnosticMsg()); + appAttempt.appendToDiagnosticsSafely(event.getDiagnosticMsg()); } super.transition(appAttempt, event); } @@ -1566,7 +1612,7 @@ public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { // Use diagnostic from launcher - appAttempt.diagnostics.append(event.getDiagnosticMsg()); + appAttempt.appendToDiagnosticsSafely(event.getDiagnosticMsg()); // Tell the app, scheduler super.transition(appAttempt, event); @@ -1655,8 +1701,7 @@ public void transition(RMAppAttemptImpl appAttempt, private void setAMContainerCrashedDiagnosticsAndExitStatus( RMAppAttemptContainerFinishedEvent finishEvent) { ContainerStatus status = finishEvent.getContainerStatus(); - String diagnostics = getAMContainerCrashedDiagnostics(finishEvent); - this.diagnostics.append(diagnostics); + appendToDiagnosticsSafely(getAMContainerCrashedDiagnostics(finishEvent)); this.amContainerExitStatus = status.getExitStatus(); } @@ -1718,7 +1763,7 @@ public ExpiredTransition() { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - appAttempt.diagnostics.append(getAMExpiredDiagnostics(event)); + appAttempt.appendToDiagnosticsSafely(getAMExpiredDiagnostics(event)); super.transition(appAttempt, event); } } @@ -1740,7 +1785,8 @@ public UnexpectedAMRegisteredTransition() { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { assert appAttempt.submissionContext.getUnmanagedAM(); - appAttempt.diagnostics.append(getUnexpectedAMRegisteredDiagnostics()); + appAttempt + .appendToDiagnosticsSafely(getUnexpectedAMRegisteredDiagnostics()); super.transition(appAttempt, event); } @@ -1825,7 +1871,7 @@ private void updateInfoOnAMUnregister(RMAppAttemptEvent event) { progress = 1.0f; RMAppAttemptUnregistrationEvent unregisterEvent = (RMAppAttemptUnregistrationEvent) event; - diagnostics.append(unregisterEvent.getDiagnosticMsg()); + appendToDiagnosticsSafely(unregisterEvent.getDiagnosticMsg()); originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl()); finalStatus = unregisterEvent.getFinalApplicationStatus(); } @@ -2232,4 +2278,57 @@ public void setRecoveredFinalState(RMAppAttemptState finalState) { } return Collections.EMPTY_SET; } + + @VisibleForTesting + static class BoundedAppender { + @VisibleForTesting + static final String TRUNCATED_MESSAGES_TEMPLATE = + "Diagnostic messages truncated, showing last " + + "(%d) chars out of (%d) in total:%n...%s"; + + private final int limit; + private final StringBuilder messages = new StringBuilder(); + private int totalCharacterCount = 0; + + BoundedAppender(final int limit) { + Preconditions.checkArgument(limit > 0, "limit should be positive"); + + this.limit = limit; + } + + BoundedAppender append(final CharSequence csq) { + appendAndCount(csq); + checkAndCut(); + + return this; + } + + private void appendAndCount(final CharSequence csq) { + final int before = messages.length(); + messages.append(csq); + final int after = messages.length(); + totalCharacterCount += after - before; + } + + private void checkAndCut() { + if (messages.length() > limit) { + final int newStart = messages.length() - limit; + messages.delete(0, newStart); + } + } + + int currentLength() { + return messages.length(); + } + + @Override + public String toString() { + if (messages.length() < totalCharacterCount) { + return String.format(TRUNCATED_MESSAGES_TEMPLATE, messages.length(), + totalCharacterCount, messages.toString()); + } + + return messages.toString(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestBoundedAppender.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestBoundedAppender.java new file mode 100644 index 0000000000000000000000000000000000000000..9cb1e0404ff506e4fe60d364ad132cfb6f8da291 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestBoundedAppender.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.BoundedAppender; + +/** + * Test class for {@link BoundedAppender}. + */ +public class TestBoundedAppender { + @Rule + public ExpectedException expected = ExpectedException.none(); + + @Test + public void initWithZeroLimitThrowsException() { + expected.expect(IllegalArgumentException.class); + expected.expectMessage("limit should be positive"); + + new BoundedAppender(0); + } + + @Test + public void nullAppendedNullStringRead() { + final BoundedAppender boundedAppender = new BoundedAppender(4); + boundedAppender.append(null); + + assertEquals("null appended, \"null\" read", "null", + boundedAppender.toString()); + } + + @Test + public void appendBelowLimitOnceValueIsReadCorrectly() { + final BoundedAppender boundedAppender = new BoundedAppender(2); + + boundedAppender.append("ab"); + + assertEquals("value appended is read correctly", "ab", + boundedAppender.toString()); + } + + @Test + public void appendValuesBelowLimitAreReadCorrectlyInFifoOrder() { + final BoundedAppender boundedAppender = new BoundedAppender(3); + + boundedAppender.append("ab"); + boundedAppender.append("cd"); + boundedAppender.append("e"); + boundedAppender.append("fg"); + + assertEquals("last values appended fitting limit are read correctly", + String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 7, "efg"), + boundedAppender.toString()); + } + + @Test + public void appendLastAboveLimitPreservesLastMessagePostfix() { + final BoundedAppender boundedAppender = new BoundedAppender(3); + + boundedAppender.append("ab"); + boundedAppender.append("cde"); + boundedAppender.append("fghij"); + + assertEquals( + "last value appended above limit postfix is read correctly", String + .format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 10, "hij"), + boundedAppender.toString()); + } + + @Test + public void appendMiddleAboveLimitPreservesLastMessageAndMiddlePostfix() { + final BoundedAppender boundedAppender = new BoundedAppender(3); + + boundedAppender.append("ab"); + boundedAppender.append("cde"); + + assertEquals("last value appended above limit postfix is read correctly", + String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 5, "cde"), + boundedAppender.toString()); + + boundedAppender.append("fg"); + + assertEquals( + "middle value appended above limit postfix and last value are " + + "read correctly", + String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 7, "efg"), + boundedAppender.toString()); + + boundedAppender.append("hijkl"); + + assertEquals( + "last value appended above limit postfix is read correctly", String + .format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 12, "jkl"), + boundedAppender.toString()); + } +} \ No newline at end of file