Index: src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (revision 1295407) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (working copy) @@ -86,7 +86,7 @@ try { Class externalCodec = ClassLoader.getSystemClassLoader().loadClass("com.hadoop.compression.lzo.LzoCodec"); - lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, + lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, new Configuration(conf)); } catch (ClassNotFoundException e) { throw new RuntimeException(e); @@ -101,7 +101,7 @@ @Override DefaultCodec getCodec(Configuration conf) { if (codec == null) { - codec = new GzipCodec(); + codec = new ReusableStreamGzipCodec(); codec.setConf(new Configuration(conf)); } @@ -176,7 +176,6 @@ public OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { - CompressionCodec codec = getCodec(conf); OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); @@ -184,15 +183,25 @@ else { bos1 = downStream; } - ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); - CompressionOutputStream cos = - codec.createOutputStream(bos1, compressor); + CompressionOutputStream cos = createPlainCompressionStream(bos1, + compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; } + /** + * Creates a compression stream without any additional wrapping into + * buffering streams. + */ + CompressionOutputStream createPlainCompressionStream( + OutputStream downStream, Compressor compressor) throws IOException { + CompressionCodec codec = getCodec(conf); + ((Configurable) codec).getConf().setInt("io.file.buffer.size", 32 * 1024); + return codec.createOutputStream(downStream, compressor); + } + public Compressor getCompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1295407) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -64,6 +65,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; @@ -198,6 +200,8 @@ * HFile Writer. */ public static class Writer implements Closeable { + //Gzip header size + private static final int HEADER_SIZE = 10; // FileSystem stream to write on. private FSDataOutputStream outputStream; // True if we opened the outputStream (and so will close it). @@ -260,6 +264,12 @@ // May be null if we were passed a stream. private Path path = null; + /** Compression output stream */ + private CompressionOutputStream compressionStream; + + /** Buffered output stream used in gzip**/ + private BufferedOutputStream compressionBos; + /** * Constructor that uses all defaults for compression and block size. * @param fs @@ -380,11 +390,34 @@ private void newBlock() throws IOException { // This is where the next block begins. blockBegin = outputStream.getPos(); + + if (this.compressAlgo.equals(Compression.Algorithm.GZ) && blockBegin > 0) { + blockBegin -= HEADER_SIZE; + } + + if (compressionBos == null + && this.compressAlgo.equals(Compression.Algorithm.GZ)) { + createCompressionStream(); + } this.out = getCompressingStream(); this.out.write(DATABLOCKMAGIC); firstKey = null; } + private void createCompressionStream() { + this.compressor = this.compressAlgo.getCompressor(); + try { + compressionStream = this.compressAlgo.createPlainCompressionStream( + this.outputStream, compressor); + this.compressionBos = new BufferedOutputStream( + new Compression.FinishOnFlushCompressionStream(compressionStream), + 4 * 1024); + } catch (IOException e) { + throw new RuntimeException("Could not create compression stream " + + "for algorithm " + compressAlgo, e); + } + } + /* * Sets up a compressor and creates a compression stream on top of * this.outputStream. Get one per block written. @@ -394,7 +427,6 @@ * @see {@link #releaseCompressingStream(DataOutputStream)} */ private DataOutputStream getCompressingStream() throws IOException { - this.compressor = compressAlgo.getCompressor(); // Get new DOS compression stream. In tfile, the DOS, is not closed, // just finished, and that seems to be fine over there. TODO: Check // no memory retention of the DOS. Should I disable the 'flush' on the @@ -403,10 +435,14 @@ // compressed downstream should be only when done. I was going to but // looks like when we call flush in here, its legitimate flush that // should go through to the compressor. - OutputStream os = - this.compressAlgo.createCompressionStream(this.outputStream, - this.compressor, 0); - return new DataOutputStream(os); + if(compressionBos == null){ + this.compressor = compressAlgo.getCompressor(); + OutputStream os = + this.compressAlgo.createCompressionStream(this.outputStream, + this.compressor, 0); + return new DataOutputStream(os); + } + return new DataOutputStream(compressionBos); } /* @@ -420,8 +456,6 @@ private int releaseCompressingStream(final DataOutputStream dos) throws IOException { dos.flush(); - this.compressAlgo.returnCompressor(this.compressor); - this.compressor = null; return dos.size(); } @@ -440,7 +474,7 @@ for (i = 0; i < metaNames.size(); ++i) { // stop when the current key is greater than our own byte[] cur = metaNames.get(i); - if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, + if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { break; } @@ -653,8 +687,12 @@ if (this.closeOutputStream) { this.outputStream.close(); - this.outputStream = null; } + this.compressAlgo.returnCompressor(this.compressor); + this.compressor = null; + this.outputStream = null; + this.compressionStream = null; + this.compressionBos = null; } /* Index: src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java (revision 0) @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.CompressorStream; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; + +/** + * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression + * streams. + */ +public class ReusableStreamGzipCodec extends GzipCodec { + + private static final Log LOG = LogFactory.getLog(Compression.class); + + /** + * A bridge that wraps around a DeflaterOutputStream to make it a + * CompressionOutputStream. + */ + protected static class ReusableGzipOutputStream extends CompressorStream { + + private static final int GZIP_HEADER_LENGTH = 10; + + /** + * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for + * details. + */ + private static final byte[] GZIP_HEADER; + + static { + // Capture the fixed ten-byte header hard-coded in GZIPOutputStream. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] header = null; + GZIPOutputStream gzipStream = null; + try { + gzipStream = new GZIPOutputStream(baos); + gzipStream.finish(); + header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH); + } catch (IOException e) { + throw new RuntimeException("Could not create gzip stream", e); + } finally { + if (gzipStream != null) { + try { + gzipStream.close(); + } catch (IOException e) { + LOG.error(e); + } + } + } + GZIP_HEADER = header; + } + + private static class ResetableGZIPOutputStream extends GZIPOutputStream { + public ResetableGZIPOutputStream(OutputStream out) throws IOException { + super(out); + } + + public void resetState() throws IOException { + def.reset(); + crc.reset(); + out.write(GZIP_HEADER); + } + } + + public ReusableGzipOutputStream(OutputStream out) throws IOException { + super(new ResetableGZIPOutputStream(out)); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + out.write(data, offset, length); + } + + @Override + public void finish() throws IOException { + ((GZIPOutputStream) out).finish(); + } + + @Override + public void resetState() throws IOException { + ((ResetableGZIPOutputStream) out).resetState(); + } + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + if (ZlibFactory.isNativeZlibLoaded(getConf())) { + return super.createOutputStream(out); + } + return new ReusableGzipOutputStream(out); + } + +}