diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index fd0a2141e68..c60e2f711b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -71,6 +71,7 @@ import java.security.PrivateKey; import java.security.cert.X509Certificate; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -193,7 +194,7 @@ .newInstance(1, 5); @VisibleForTesting public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES"; - + static public final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff); /* Znode paths */ private String zkRootNodePath; private String rmAppRoot; @@ -894,6 +895,17 @@ protected synchronized void updateApplicationStateInternal( } } + private void trimAttempStateData( + ApplicationAttemptStateData attemptStateDataPB, int toTrimSize){ + String diagnostics = attemptStateDataPB.getDiagnostics(); + int originLen = diagnostics.getBytes().length; + if(originLen - toTrimSize > 0){ + LOG.warn("trimming diagnostics info"); + String trimmedDiagnostics = Arrays.copyOf(diagnostics.getBytes(), + originLen - toTrimSize).toString(); + attemptStateDataPB.setDiagnostics(trimmedDiagnostics); + } + } /* * Handles store, update and remove application attempt state store * operations. @@ -922,6 +934,13 @@ private void handleApplicationAttemptStateOp( String path = getNodePath(appDirPath, appAttemptId.toString()); byte[] attemptStateData = (attemptStateDataPB == null) ? null : attemptStateDataPB.getProto().toByteArray(); + if(attemptStateData.length > maxBuffer){ + LOG.warn("{} info for attempt: {} at: {} exceeds jute.buffer length {}", + operation, appAttemptId, path, maxBuffer); + trimAttempStateData(attemptStateDataPB, + maxBuffer - attemptStateData.length); + attemptStateData = attemptStateDataPB.getProto().toByteArray(); + } LOG.debug("{} info for attempt: {} at: {}", operation, appAttemptId, path); switch (operation) {