Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ReusableStreamGzipCodec.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ReusableStreamGzipCodec.java (revision 1423268) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ReusableStreamGzipCodec.java (working copy) @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.CompressorStream; import org.apache.hadoop.io.compress.GzipCodec; @@ -77,8 +78,14 @@ } private static class ResetableGZIPOutputStream extends GZIPOutputStream { + + private static final int TRAILER_SIZE = 8; + private static boolean HAS_BROKEN_FINISH; + public ResetableGZIPOutputStream(OutputStream out) throws IOException { super(out); + JVM jvm = new JVM(); + HAS_BROKEN_FINISH = jvm.isGZIPOutputStreamFinishBroken(); } public void resetState() throws IOException { @@ -86,6 +93,59 @@ crc.reset(); out.write(GZIP_HEADER); } + + /** + * Override because certain implementation calls def.end() which + * causes problem when resetting the stream for reuse. + */ + @Override + public void finish() throws IOException { + if (HAS_BROKEN_FINISH) { + if (!def.finished()) { + def.finish(); + while (!def.finished()) { + int i = def.deflate(this.buf, 0, this.buf.length); + if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) { + writeTrailer(this.buf, i); + i += TRAILER_SIZE; + out.write(this.buf, 0, i); + + return; + } + if (i > 0) { + out.write(this.buf, 0, i); + } + } + + byte[] arrayOfByte = new byte[TRAILER_SIZE]; + writeTrailer(arrayOfByte, 0); + out.write(arrayOfByte); + } + } else { + super.finish(); + } + } + + /** re-implement because the relative method in jdk is invisible */ + private void writeTrailer(byte[] paramArrayOfByte, int paramInt) + throws IOException { + writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt); + writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4); + } + + /** re-implement because the relative method in jdk is invisible */ + private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2) + throws IOException { + writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2); + writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2); + } + + /** re-implement because the relative method in jdk is invisible */ + private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2) + throws IOException { + paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF); + paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF); + } } public ReusableGzipOutputStream(OutputStream out) throws IOException { Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java (revision 1423268) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/JVM.java (working copy) @@ -57,6 +57,7 @@ System.getProperty("os.name").startsWith("Windows"); private static final boolean linux = System.getProperty("os.name").startsWith("Linux"); + private static final String JVMVersion = System.getProperty("java.version"); /** * Constructor. Get the running Operating System instance @@ -76,6 +77,15 @@ } return (ibmvendor ? linux : true); } + + /** + * Check if the finish() method of GZIPOutputStream is broken + * + * @return whether GZIPOutputStream.finish() is broken. + */ + public boolean isGZIPOutputStreamFinishBroken() { + return ibmvendor && JVMVersion.contains("1.6.0"); + } /** * Load the implementation of UnixOperatingSystemMXBean for Oracle jvm