diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index fd0a2141e68..e7b794914a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -67,10 +67,13 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.PrivateKey; import java.security.cert.X509Certificate; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -193,7 +196,6 @@ .newInstance(1, 5); @VisibleForTesting public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES"; - /* Znode paths */ private String zkRootNodePath; private String rmAppRoot; @@ -894,6 +896,18 @@ protected synchronized void updateApplicationStateInternal( } } + private void truncateAttempStateData( + ApplicationAttemptStateData attemptStateDataPB, int toTruncateSize) throws UnsupportedEncodingException { + String diagnostics = attemptStateDataPB.getDiagnostics(); + byte[] diagnosticsBytes = diagnostics.getBytes(StandardCharsets.UTF_8); + int originLen = diagnosticsBytes.length; + if(originLen - toTruncateSize > 0){ + LOG.warn("truncating diagnostics info"); + String truncatedDiagnostics = new String(Arrays.copyOf(diagnosticsBytes, + originLen - toTruncateSize), StandardCharsets.UTF_8); + attemptStateDataPB.setDiagnostics(truncatedDiagnostics); + } + } /* * Handles store, update and remove application attempt state store * operations. @@ -922,6 +936,15 @@ private void handleApplicationAttemptStateOp( String path = getNodePath(appDirPath, appAttemptId.toString()); byte[] attemptStateData = (attemptStateDataPB == null) ? null : attemptStateDataPB.getProto().toByteArray(); + if(attemptStateData != null && attemptStateData.length > zknodeLimit){ + LOG.warn("{} info for attempt: {} at: {} exceeds the maximum allowed " + + "size for znode data. " + + "See yarn.resourcemanager.zk-max-znode-size.bytes.", + operation, appAttemptId, path, zknodeLimit); + truncateAttempStateData(attemptStateDataPB, + attemptStateData.length - zknodeLimit); + attemptStateData = attemptStateDataPB.getProto().toByteArray(); + } LOG.debug("{} info for attempt: {} at: {}", operation, appAttemptId, path); switch (operation) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 9f8732acbde..b9a1a1001da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -211,6 +212,60 @@ protected RMAppAttempt storeAttempt(RMStateStore store, return mockAttempt; } + protected RMAppAttempt storeAttemptWithHugeDiagnostics(RMStateStore store, + ApplicationAttemptId attemptId, + String containerIdStr, Token appToken, + SecretKey clientTokenMasterKey, TestDispatcher dispatcher, + int diagnosticSize) + throws Exception { + + RMAppAttemptMetrics mockRmAppAttemptMetrics = + mock(RMAppAttemptMetrics.class); + Container container = new ContainerPBImpl(); + container.setId(ContainerId.fromString(containerIdStr)); + RMAppAttempt mockAttempt = mock(RMAppAttempt.class); + when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); + when(mockAttempt.getMasterContainer()).thenReturn(container); + when(mockAttempt.getAMRMToken()).thenReturn(appToken); + when(mockAttempt.getClientTokenMasterKey()) + .thenReturn(clientTokenMasterKey); + when(mockAttempt.getRMAppAttemptMetrics()) + .thenReturn(mockRmAppAttemptMetrics); + when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage()) + .thenReturn(new AggregateAppResourceUsage(new HashMap<>())); + String diagnostics = getDiagnosticsWithSize(diagnosticSize); + when(mockAttempt.getDiagnostics()).thenReturn(diagnostics); + dispatcher.attemptId = attemptId; + store.storeNewApplicationAttempt(mockAttempt); + + ApplicationAttemptStateData newAttemptState = + ApplicationAttemptStateData.newInstance( + mockAttempt.getAppAttemptId(), + mockAttempt.getMasterContainer(), + store.getCredentialsFromAppAttempt(mockAttempt), + mockAttempt.getStartTime(), RMAppAttemptState.FAILED, + "myTrackingUrl", mockAttempt.getDiagnostics(), + FinalApplicationStatus.FAILED, 100, + mockAttempt.getFinishTime(), new HashMap<>(), new HashMap<>()); + store.updateApplicationAttemptState(newAttemptState); + waitNotify(dispatcher); + return mockAttempt; + } + + private String getDiagnosticsWithSize(int diagnosticSize) { + byte[] input = ("This is for testing attempt znode " + + "holding huge data. ").getBytes(StandardCharsets.UTF_8); + byte[] diagnosticBytes = new byte[diagnosticSize]; + int accumulatedSize = 0; + while(accumulatedSize < diagnosticSize) { + System.arraycopy(input, 0, diagnosticBytes, accumulatedSize, + accumulatedSize + input.length > diagnosticSize ? + diagnosticSize - accumulatedSize : input.length); + accumulatedSize += input.length; + } + return new String(diagnosticBytes, StandardCharsets.UTF_8); + } + protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher, ApplicationAttemptStateData attemptState) { dispatcher.attemptId = attemptState.getAttemptId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 6b6f0131cd6..603e5036031 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.test.TestingServer; +import static org.junit.Assert.assertNotEquals; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -329,6 +330,44 @@ public void handle(Event event) { ; } + @Test + public void testAttemptZKNodeLimit() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES, 100 * 1024); + RMStateStore store = zkTester.getRMStateStore(conf); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + ApplicationId appId1 = + ApplicationId.fromString("application_1352994193343_0001"); + storeApp(store, appId1, submitTime, startTime); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId1, 1); + RMAppAttempt attempt1 = storeAttemptWithHugeDiagnostics(store, attemptId1, + ContainerId.newContainerId(attemptId1, 1).toString(), + null, null, dispatcher, 100 * 1024); + + LOG.warn(attempt1.getDiagnostics()); + Thread.sleep(1000); + store.close(); + + // load state + store = zkTester.getRMStateStore(conf); + RMState state = store.loadState(); + Map rmAppState = + state.getApplicationState(); + + ApplicationStateData appState = rmAppState.get(appId1); + // app is loaded + assertNotNull(appState); + ApplicationAttemptStateData attemptStateData1 = + appState.getAttempt(attemptId1); + assertNotEquals("", attempt1.getDiagnostics(), + attemptStateData1.getDiagnostics()); + } + @Test (timeout = 60000) public void testCheckMajorVersionChange() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {