diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1212a91..9dec9e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -540,7 +540,8 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { ApplicationId appId = ApplicationId.fromString(childNodeName); ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( - ApplicationStateDataProto.parseFrom(childData)); + (ApplicationStateDataProto) decompressProto( + ApplicationStateDataProto.PARSER, childData)); if (!appId.equals( appState.getApplicationSubmissionContext().getApplicationId())) { @@ -587,7 +588,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); } - byte[] appStateData = appStateDataPB.getProto().toByteArray(); + byte[] appStateData = compressProto(appStateDataPB.getProto()); safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); } @@ -603,7 +604,7 @@ protected synchronized void updateApplicationStateInternal( + nodeUpdatePath); } - byte[] appStateData = appStateDataPB.getProto().toByteArray(); + byte[] appStateData = compressProto(appStateDataPB.getProto()); if (exists(nodeUpdatePath)) { safeSetData(nodeUpdatePath, appStateData, -1); @@ -1043,4 +1044,83 @@ public void run() { } } } + + @VisibleForTesting + byte[] compressProto(GeneratedMessage proto) { + byte[] ret = new byte[0]; + ByteArrayOutputStream out = null; + GZIPOutputStream zip = null; + + try { + out = new ByteArrayOutputStream(); + zip = new GZIPOutputStream(out); + // allows us to write past the protobuf (if needed) + //and flush all contents + proto.writeDelimitedTo(zip); + zip.close(); + + ret = out.toByteArray(); + + } catch (IOException e) { + LOG.info("Couldnt compress proto - using default proto toByte()"); + LOG.error(StringUtils.stringifyException(e)); + + ret = proto.toByteArray(); + + } finally { + try { + if (out != null) + out.close(); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + return ret; + } + + @VisibleForTesting + Message decompressProto( + Parser protoParser, byte[] compressedBytes) { + + byte[] original = Arrays.copyOf(compressedBytes, compressedBytes.length); + GZIPInputStream zip = null; + + try { + zip = new GZIPInputStream( + new ByteArrayInputStream(compressedBytes,0,compressedBytes.length)); + return protoParser.parseDelimitedFrom(zip); + + } catch (InvalidProtocolBufferException e) { + // we can land here because of an exception opening the GZIPStream + LOG.error("Error on parseDelimitedFrom() with compressed bits. Will try uncompressed"); + LOG.error(e.toString()); + return deserializeUncompressedProto(protoParser, original); + } catch (IOException e) { + // we can land here because of an exception opening the GZIPStream + // this is needed for forward/backward compat + LOG.error("Error on opening GZIPStream with provided bytes"); + LOG.error(e.toString()); + return deserializeUncompressedProto(protoParser, original); + + } finally { + try { + if (zip != null) + zip.close(); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + } + + private Message deserializeUncompressedProto( + Parser protoParser, byte[] original) { + LOG.info("Trying to parse protobuf without GZIP"); + try { + return protoParser.parseFrom(original); + } catch (InvalidProtocolBufferException e) { + LOG.error("Error on parseDelimitedFrom() with original bits. Cant do much more.."); + LOG.error(e.toString()); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index f71cf25..d5287ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -207,6 +207,53 @@ public void testZKRMStateStoreRealZK() throws Exception { zkTester.getRMStateStore()).testRetryingCreateRootDir(); } + @Test + public void testZKRMStateStoreGZIPCompressor() { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + + ApplicationSubmissionContext context = + new ApplicationSubmissionContextPBImpl(); + ResourceRequest rr = + ResourceRequest.newInstance(Priority.newInstance(0), "n1",org.apache.hadoop.yarn.api.records.Resource.newInstance(0, 1) , 1); + ApplicationStateData appState = + ApplicationStateData + .newInstance(0,System.currentTimeMillis(), context, "test"); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + sb.append("http://www.apache.org/vol1/cosmosAdmin/system/runtimes/blabla/fooFoo/deeDee/__foo" + + i + "__.txt \n"); + } + appState.setCallerContext(new CallerContext.Builder(sb.toString()).build()); + + try { + ZKRMStateStore store = (ZKRMStateStore) zkTester.getRMStateStore(); + byte[] compressed = store.compressProto(appState.getProto()); + byte[] uncompressed = appState.getProto().toByteArray(); + // should be roughly 28K + LOG.info("Compressed size in MB is:" + compressed.length); + // should be roughly 1MB + LOG.info("Uncompressed size in MB is:" + uncompressed.length); + + // Check if the compressed form decompressed correctly + ApplicationStateData appStateDecompressed = + new ApplicationStateDataPBImpl( + (ApplicationStateDataProto) store.decompressProto( + ApplicationStateDataProto.PARSER, compressed)); + assertEquals(appState, appStateDecompressed); + + // check if the decompressor will deal with backward compatibility + ApplicationStateData appStateProtoSerialized = + new ApplicationStateDataPBImpl( + (ApplicationStateDataProto) store.decompressProto( + ApplicationStateDataProto.PARSER, uncompressed)); + assertEquals(appState, appStateProtoSerialized); + + } catch (Exception e) { + assertTrue("couldnt instantiate ZKRMStateStore", true); + } + } + + @Test (timeout = 60000) public void testCheckMajorVersionChange() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {