From a0581e23c87b3e6b686f72fb6b6484f16efd6c2a Mon Sep 17 00:00:00 2001 From: mbautin Date: Fri, 10 Feb 2012 16:25:46 -0800 Subject: [PATCH] Fix deflater leak --- .../apache/hadoop/hbase/io/hfile/Compression.java | 18 +++- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 38 ++++-- .../hbase/io/hfile/ReusableStreamGzipCodec.java | 138 ++++++++++++++++++++ .../hadoop/hbase/io/hfile/TestHFileBlock.java | 19 ++-- 4 files changed, 190 insertions(+), 23 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java index c0de5fc..ca97924 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java @@ -44,6 +44,8 @@ import org.apache.hadoop.util.ReflectionUtils; public final class Compression { static final Log LOG = LogFactory.getLog(Compression.class); + private static final int COMPRESSION_BUFFER_SIZE = 32 * 1024; + /** * Prevent the instantiation of class. */ @@ -119,7 +121,7 @@ public final class Compression { @Override DefaultCodec getCodec(Configuration conf) { if (codec == null) { - codec = new GzipCodec(); + codec = new ReusableStreamGzipCodec(); codec.setConf(new Configuration(conf)); } @@ -210,6 +212,18 @@ public final class Compression { } + private void configureCodec(CompressionCodec codec) { + ((Configurable) codec).getConf().setInt("io.file.buffer.size", + COMPRESSION_BUFFER_SIZE); + } + + public CompressionOutputStream createCompressionOutStream( + OutputStream downStream, Compressor compressor) throws IOException { + CompressionCodec codec = getCodec(conf); + configureCodec(codec); + return codec.createOutputStream(downStream, compressor); + } + public OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { @@ -221,7 +235,7 @@ public final class Compression { else { bos1 = downStream; } - ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); + configureCodec(codec); CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 256554d..012593c 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -28,7 +28,6 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.fs.FSDataInputStream; @@ -44,6 +43,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; @@ -547,6 +547,12 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { /** Compressor, which is also reused between consecutive blocks. */ private Compressor compressor; + /** Compression output stream */ + private CompressionOutputStream compressionOutStream; + + /** Underlying stream to write compressed bytes to */ + private ByteArrayOutputStream compressedByteStream; + /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} @@ -602,9 +608,18 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; baosInMemory = new ByteArrayOutputStream(); - if (compressAlgo != NONE) + if (compressAlgo != NONE) { compressor = compressionAlgorithm.getCompressor(); - + compressedByteStream = new ByteArrayOutputStream(); + try { + compressionOutStream = compressionAlgorithm.createCompressionStream( + compressedByteStream, compressor); + } catch (IOException e) { + throw new RuntimeException("Could not create compression stream " + + "for algorithm " + compressionAlgorithm, e); + } + } + prevOffsetByType = new long[BlockType.values().length]; for (int i = 0; i < prevOffsetByType.length; ++i) prevOffsetByType[i] = -1; @@ -697,19 +712,18 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { private void doCompression() throws IOException { // do the compression if (compressAlgo != NONE) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - baos.write(DUMMY_HEADER); + compressedByteStream.reset(); + compressedByteStream.write(DUMMY_HEADER); + + compressionOutStream.resetState(); - // compress the data - OutputStream compressingOutputStream = - compressAlgo.createCompressionStream(baos, compressor, 0); - compressingOutputStream.write(uncompressedBytesWithHeader, HEADER_SIZE, + compressionOutStream.write(uncompressedBytesWithHeader, HEADER_SIZE, uncompressedBytesWithHeader.length - HEADER_SIZE); - // finish compression stream - compressingOutputStream.flush(); + compressionOutStream.flush(); + compressionOutStream.finish(); - onDiskBytesWithHeader = baos.toByteArray(); + onDiskBytesWithHeader = compressedByteStream.toByteArray(); putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, uncompressedBytesWithHeader.length); } else { diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java new file mode 100644 index 0000000..336f354 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.CompressorStream; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; + +/** + * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression + * streams. + */ +public class ReusableStreamGzipCodec extends GzipCodec { + + private static final Log LOG = LogFactory.getLog(Compression.class); + + private static final int NATIVE_GZIP_DEFAULT_BUFFER_SIZE = 4 * 1024; + + /** + * A bridge that wraps around a DeflaterOutputStream to make it a + * CompressionOutputStream. + */ + protected static class ReusableGzipOutputStream extends CompressorStream { + + private static final int GZIP_HEADER_LENGTH = 10; + + /** + * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for + * details. + */ + private static final byte[] GZIP_HEADER; + + static { + // Capture the fixed ten-byte header hard-coded in GZIPOutputStream. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] header = null; + GZIPOutputStream gzipStream = null; + try { + gzipStream = new GZIPOutputStream(baos); + gzipStream.finish(); + header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH); + } catch (IOException e) { + throw new RuntimeException("Could not create gzip stream", e); + } finally { + if (gzipStream != null) { + try { + gzipStream.close(); + } catch (IOException e) { + LOG.error(e); + } + } + } + GZIP_HEADER = header; + } + + private static class ResetableGZIPOutputStream extends GZIPOutputStream { + public ResetableGZIPOutputStream(OutputStream out) throws IOException { + super(out); + } + + public void resetState() throws IOException { + def.reset(); + crc.reset(); + out.write(GZIP_HEADER); + } + } + + public ReusableGzipOutputStream(OutputStream out) throws IOException { + super(new ResetableGZIPOutputStream(out)); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + out.write(data, offset, length); + } + + @Override + public void finish() throws IOException { + ((GZIPOutputStream) out).finish(); + } + + @Override + public void resetState() throws IOException { + ((ResetableGZIPOutputStream) out).resetState(); + } + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + if (ZlibFactory.isNativeZlibLoaded(getConf())) { + int bufferSize = getConf().getInt("io.file.buffer.size", + NATIVE_GZIP_DEFAULT_BUFFER_SIZE); + return new CompressorStream(out, createCompressor(), bufferSize); + } else { + return new ReusableGzipOutputStream(out); + } + } + +} diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index e60e617..6556bbb 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -75,9 +75,6 @@ public class TestHFileBlock { static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; - // In case we need to temporarily switch some test cases to just test gzip. - static final Compression.Algorithm[] GZIP_ONLY = { GZ }; - private static final int NUM_TEST_BLOCKS = 1000; private static final int NUM_READER_THREADS = 26; @@ -206,14 +203,16 @@ public class TestHFileBlock { return headerAndData; } - public String createTestBlockStr(Compression.Algorithm algo) - throws IOException { + public String createTestBlockStr(Compression.Algorithm algo, + int correctLength) throws IOException { byte[] testV2Block = createTestV2Block(algo); int osOffset = HFileBlock.HEADER_SIZE + 9; - if (osOffset < testV2Block.length) { + if (testV2Block.length == correctLength) { // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid // variations across operating systems. // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. + // We only make this change when the compressed block length matches. + // Otherwise, there are obviously other inconsistencies. testV2Block[osOffset] = 3; } return Bytes.toStringBinary(testV2Block); @@ -226,7 +225,7 @@ public class TestHFileBlock { @Test public void testGzipCompression() throws IOException { - assertEquals( + final String correctTestBlockStr = "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" + "\\xFF\\xFF\\xFF\\xFF" // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html @@ -240,8 +239,10 @@ public class TestHFileBlock { + "\\x03" + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" - + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00", - createTestBlockStr(GZ)); + + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00"; + final int correctGzipBlockLength = 82; + assertEquals(correctTestBlockStr, createTestBlockStr(GZ, + correctGzipBlockLength)); } @Test -- 1.7.4.4